🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
Sign inDemoInstall
Socket

@jetit/publisher

Package Overview
Dependencies
Maintainers
0
Versions
53
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@jetit/publisher - npm Package Compare versions

Comparing version

to
5.4.0

4

package.json
{
"name": "@jetit/publisher",
"version": "5.3.2",
"version": "5.4.0",
"type": "commonjs",
"dependencies": {
"@jetit/id": "^0.0.12",
"@jetit/id": "^0.0.13",
"ioredis": "^5.3.0",

@@ -8,0 +8,0 @@ "rxjs": "^7.8.0",

@@ -203,2 +203,6 @@ import { Observable } from 'rxjs';

processStoredEvents(): Promise<void>;
/**
* Acknowledges a message and updates the last acknowledged message ID.
* This is used to track cleanup progress and ensure we don't delete unprocessed messages.
*/
acknowledgeMessage(ackKey: string): Promise<void>;

@@ -205,0 +209,0 @@ private frameMessageKey;

@@ -296,7 +296,11 @@ "use strict";

listenInternals(eventName, subscriptionId, eventFilter, filterKeepAlive = 24 * 60 * 60 * 1000, publishOnceGuarantee = false, externalAcknowledgement = false) {
if (!this.subscriptions.has(eventName)) {
this.subscriptions.set(eventName, new Map());
// Get or create subscription map for this event
const eventSubscriptions = this.subscriptions.get(eventName) || new Map();
const isNewSubscription = !this.subscriptions.has(eventName);
if (isNewSubscription) {
this.subscriptions.set(eventName, eventSubscriptions);
}
const bs = new rxjs_1.BehaviorSubject(null);
const subscription = {
// Making the subscription Immutable
const subscription = Object.freeze({
subject: bs,

@@ -306,14 +310,34 @@ filter: eventFilter,

keepAlive: filterKeepAlive,
};
this.subscriptions.get(eventName).set(subscriptionId, subscription);
const timer = (0, rxjs_1.interval)(10000).subscribe(async () => {
/** Clear earlier unprocessed messages. Runs every 10 seconds */
await processMessage(this.redisGroups, '0', new tracker_1.MetricsTracker(), false);
});
eventSubscriptions.set(subscriptionId, subscription);
// Return early if not first subscription
if (!isNewSubscription) {
return bs.asObservable().pipe((0, rxjs_1.skip)(1));
}
const cleanupInterval = 10000; // 10 seconds
const timer = (0, rxjs_1.interval)(cleanupInterval).subscribe({
next: async () => {
try {
await processMessage(this.redisGroups, '0', new tracker_1.MetricsTracker(), false);
}
catch (error) {
logger_1.PUBLISHER_LOGGER.error('Error in running recurring cleanup task:', error);
}
},
error: (error) => {
logger_1.PUBLISHER_LOGGER.error('Fatal error in cleanup timer:', error);
},
});
// Create observable with proper cleanup
const observable = bs.asObservable().pipe((0, rxjs_1.skip)(1), (0, rxjs_1.finalize)(() => {
/** Cleanup timer */
timer.unsubscribe();
// Clean up subscription on completion
this.removeSubscription(eventName, subscriptionId);
}));
const streamName = `${eventName}:${this.consumerGroupName}`;
const processMessage = async (redisClient, messageId, tracker, multicast = false, processPending = false) => {
// Skip processing if subscription was removed. This is needed because the processing is independent of the subscription
if (!this.subscriptions.has(eventName)) {
return;
}
const streamName = `${eventName}:${this.consumerGroupName}`;
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Processing message ${messageId} for ${streamName}`);

@@ -339,31 +363,46 @@ try {

let eventData;
/**
* Both multicast messages and pending messages cannot be read by xreadgroup
* Multicast messages should not be claimed by a single consumer. And pending messages
* are usually behind in the stream so XREADGROUP will not read them and hence
* they need to be read using XRANGE.
*/
tracker.startRedisOperation();
if (multicast || processPending) {
const messages = await redisClient.xrange(streamName, messageId, messageId);
if (messages && messages.length) {
try {
eventData = JSON.parse(messages[0][1][1]);
try {
/**
* Both multicast messages and pending messages cannot be read by xreadgroup
* Multicast messages should not be claimed by a single consumer. And pending messages
* are usually behind in the stream so XREADGROUP will not read them and hence
* they need to be read using XRANGE.
*/
if (multicast || processPending) {
const messages = await redisClient.xrange(streamName, messageId, messageId);
if (messages?.length) {
try {
eventData = JSON.parse(messages[0][1][1]);
}
catch (error) {
logger_1.PUBLISHER_LOGGER.error(`JSON parsing failed for message: ${messages[0][1][1]}`);
return;
}
}
catch (e) {
console.error(`JSON parsing failed for the following message ${messages[0][1][1]} in the publisher.`);
}
}
}
else {
const messages = (await redisClient.xreadgroup('GROUP', this.consumerGroupName, this.instanceId, 'COUNT', 1, 'STREAMS', streamName, '>'));
if (messages && messages.length) {
if (messageId === '0') {
messageId = messages[0][1][0][0];
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Reprocessing unprocessed message with id: ${messageId}`);
else {
const messages = (await redisClient.xreadgroup('GROUP', this.consumerGroupName, this.instanceId, 'COUNT', 1, 'STREAMS', streamName, '>'));
if (messages?.length) {
if (messageId === '0') {
messageId = messages[0][1][0][0];
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Reprocessing unprocessed message with id: ${messageId}`);
}
try {
eventData = JSON.parse(messages[0][1][0][1][1]);
}
catch (error) {
logger_1.PUBLISHER_LOGGER.error(`JSON parsing failed for message: ${messages[0][1][0][1][1]}`);
return;
}
}
eventData = JSON.parse(messages[0][1][0][1][1]);
}
}
tracker.endRedisOperation();
catch (error) {
logger_1.PUBLISHER_LOGGER.error('Error retrieving or parsing event data:', error);
return;
}
finally {
tracker.endRedisOperation();
}
tracker.startProcessing();

@@ -383,29 +422,38 @@ if (eventData) {

if (subscriptions) {
const currentTime = Date.now();
const subscriptionEntries = Array.from(subscriptions.entries());
for (let i = 0; i < subscriptionEntries.length; i++) {
const [subId, sub] = subscriptionEntries[i];
if (!sub.filter || sub.filter(eventData)) {
sub.subject.next({ ...eventData, ackKey });
sub.lastMatchTime = Date.now();
// Process subscriptions in parallel for better performance
await Promise.all(subscriptionEntries.map(async ([subId, sub]) => {
try {
if (!sub.filter || sub.filter(eventData)) {
sub.subject.next({ ...eventData, ackKey });
sub.lastMatchTime = currentTime;
}
else if (currentTime - sub.lastMatchTime > sub.keepAlive) {
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: No matching events for ${eventName} (Subscription ${subId}) in the last ${sub.keepAlive / 1000 / 60 / 60} hours`);
sub.lastMatchTime = currentTime;
}
}
else if (Date.now() - sub.lastMatchTime > sub.keepAlive) {
/**
* Reset the lastMatch time every day by default. For now only
* log the data. Should add functionality to remove the filter
* if its not used at all to gain minor improvements in
* performace
*/
logger_1.PUBLISHER_LOGGER.error(`PUBLISHER: No matching events for ${eventName} (Subscription ${subId}) in the last ${sub.keepAlive / 1000 / 60 / 60} hours`);
sub.lastMatchTime = Date.now();
catch (error) {
// Log error but don't fail entire processing
logger_1.PUBLISHER_LOGGER.error(`Error processing subscription ${subId}:`, error);
}
}
}));
}
if (!externalAcknowledgement)
// Acknowledge message if needed
if (!externalAcknowledgement) {
await this.acknowledgeMessage(ackKey);
}
// Update metrics
const currentTime = Date.now();
tracker.incrementMessageRate('subscribe', eventData.eventName);
const processingTime = currentTime - eventData.createdAt;
tracker.addProcessingTime(processingTime);
tracker.setConsumerLag(this.consumerGroupName, currentTime - eventData.createdAt);
}
catch (processingError) {
logger_1.PUBLISHER_LOGGER.error(`Processing error for message ${messageId}:`, processingError);
catch (error) {
logger_1.PUBLISHER_LOGGER.error(`Processing error for message ${messageId}:`, error);
const dlqEvent = {
...eventData,
failureReason: processingError.message,
failureReason: error.message,
retryCount: (eventData.retryCount || 0) + 1,

@@ -417,8 +465,4 @@ originalStream: streamName,

await this.dlq.addToDLQ(dlqEvent);
// Don't rethrow to prevent message loss
}
tracker.incrementMessageRate('subscribe', eventData.eventName);
const processingTime = Date.now() - eventData.createdAt;
tracker.addProcessingTime(processingTime);
const lag = Date.now() - eventData.createdAt;
tracker.setConsumerLag(this.consumerGroupName, lag);
}

@@ -429,3 +473,3 @@ else {

tracker.endProcessing();
/** Process Unprocessed Message if this is a main tree, otherwise limit to processing 100 messages that are unacknowledged */
/** Process Unprocessed Messages with rate limiting */
if (!processPending) {

@@ -437,6 +481,8 @@ const unprocessedMessageIds = await (0, utils_1.getUnacknowledgedMessages)(redisClient, this.consumerGroupName, streamName, this.instanceId);

}
for (const id of unprocessedMessageIds.messageIds) {
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Reprocessing unprocessed message with id: ${id}`);
// Process messages with rate limiting
const processWithDelay = async (id, index) => {
await new Promise((resolve) => setTimeout(resolve, index * 20));
await processMessage(redisClient, id, new tracker_1.MetricsTracker(), multicast, true);
}
};
unprocessedMessageIds.messageIds.map((id, index) => processWithDelay(id, index));
}

@@ -541,34 +587,23 @@ }

const streamName = `${eventName}:${this.consumerGroupName}`;
const cleanupThreshold = Date.now() - interval;
const CHUNK_SIZE = 10000; // Process messages in chunks to avoid memory issues
const lastAckKey = `last_ack:${streamName}`;
const oneHourAgo = Date.now() - interval;
try {
// Get pending info for this consumer group once
const pendingInfo = await this.redisGroups.xpending(streamName, this.consumerGroupName);
const hasPendingMessages = pendingInfo && pendingInfo[0] > 0;
let lastId = '-'; // Start from the beginning of the stream
while (true) {
// Get a chunk of messages
const messages = await this.redisGroups.xrange(streamName, lastId, cleanupThreshold.toString(), 'COUNT', CHUNK_SIZE);
if (!messages || messages.length === 0)
break;
// Update lastId for next iteration (exclusive)
lastId = messages[messages.length - 1][0];
if (hasPendingMessages) {
// If there are pending messages, we need to check each message
const pendingDetails = (await this.redisGroups.xpending(streamName, this.consumerGroupName, lastId, '+', CHUNK_SIZE, this.instanceId));
const pendingIds = new Set(pendingDetails.map((detail) => detail[0]));
const acknowledgedIds = messages.map((msg) => msg[0]).filter((id) => !pendingIds.has(id));
if (acknowledgedIds.length > 0) {
await this.redisGroups.xdel(streamName, ...acknowledgedIds);
}
}
else {
// If no pending messages, we can safely delete all messages in this chunk
const messageIds = messages.map((msg) => msg[0]);
await this.redisGroups.xdel(streamName, ...messageIds);
}
// If we got less than CHUNK_SIZE messages, we've reached the end
if (messages.length < CHUNK_SIZE)
break;
// Get consumer group info to check if consumers are active
const groupInfo = (await this.redisGroups.xinfo('GROUPS', streamName));
// If no active consumers, leave stream as is
if (!groupInfo || !groupInfo.some((group) => group.consumers > 0)) {
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: No active consumers for ${streamName}, leaving stream as is`);
return;
}
// Get last acknowledged message ID
const lastAckId = await this.redisGroups.get(lastAckKey);
if (!lastAckId) {
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: No acknowledged messages for ${streamName}`);
return;
}
// Extract timestamp from message ID
const [timestamp] = lastAckId.split('-').map(Number);
const cleanupThreshold = Math.min(timestamp, oneHourAgo);
await this.redisGroups.xtrim(streamName, 'MINID', cleanupThreshold);
logger_1.PUBLISHER_LOGGER.log(`PUBLISHER: Cleaned up messages before last acknowledged message ${timestamp} from ${streamName}`);
}

@@ -689,5 +724,17 @@ catch (error) {

}
/**
* Acknowledges a message and updates the last acknowledged message ID.
* This is used to track cleanup progress and ensure we don't delete unprocessed messages.
*/
async acknowledgeMessage(ackKey) {
const { streamName, messageId } = this.demergeMessageKey(ackKey);
await this.redisGroups.xack(streamName, this.consumerGroupName, messageId);
const lastAckKey = `last_ack:${streamName}`;
try {
// Update last acknowledged ID and acknowledge message atomically
await Promise.all([this.redisGroups.xack(streamName, this.consumerGroupName, messageId), this.redisGroups.set(lastAckKey, messageId)]);
}
catch (error) {
logger_1.PUBLISHER_LOGGER.error(`Error acknowledging message ${messageId} for ${streamName}:`, error);
throw error;
}
}

@@ -694,0 +741,0 @@ frameMessageKey(streamName, messageId) {