🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
DemoInstallSign in
Socket

@jetit/publisher

Package Overview
Dependencies
Maintainers
2
Versions
53
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@jetit/publisher - npm Package Compare versions

Comparing version

to
5.5.1

2

package.json
{
"name": "@jetit/publisher",
"version": "5.4.1",
"version": "5.5.1",
"type": "commonjs",

@@ -5,0 +5,0 @@ "dependencies": {

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.MetricsCollector = void 0;
const logger_1 = require("../redis/logger");
class MetricsCollector {

@@ -119,9 +120,24 @@ constructor(config, dlq) {

try {
// Verify if the key is a stream before proceeding
const keyType = await this.redisClient.type(streamKey);
if (keyType !== 'stream') {
continue; // Skip non-stream keys
}
// Get stream length and pending info
const streamLength = await this.redisClient.xlen(streamKey);
// Extract consumer group name from stream key (format: eventName:cg-serviceName)
const consumerGroup = streamKey.split(':')[1];
const [eventName, consumerGroup] = streamKey.split(':');
if (!consumerGroup?.startsWith('cg-')) {
continue; // Skip if key format doesn't match expected pattern
}
// XPENDING returns [count, min-id, max-id, consumer-list]
const pendingInfo = await this.redisClient.xpending(streamKey, consumerGroup);
const totalPending = pendingInfo ? Number(pendingInfo[0]) : 0;
let totalPending = 0;
try {
const pendingInfo = await this.redisClient.xpending(streamKey, consumerGroup);
totalPending = pendingInfo ? Number(pendingInfo[0]) : 0;
}
catch (error) {
logger_1.PUBLISHER_LOGGER.error(`Error getting pending info for stream ${streamKey}:`, error);
// Continue with totalPending as 0 if XPENDING fails
}
const queueDepth = Math.max(0, streamLength - totalPending);

@@ -132,3 +148,3 @@ totalDepth += queueDepth;

catch (error) {
console.error(`Error processing key ${streamKey}:`, error);
logger_1.PUBLISHER_LOGGER.error(`Error processing key ${streamKey}:`, error);
continue;

@@ -135,0 +151,0 @@ }

@@ -21,3 +21,7 @@ "use strict";

const eventJson = JSON.stringify(event);
await this.redisClient.multi().hset(DLQ_HASH_KEY, event.eventId, eventJson).zadd(DLQ_ZSET_KEY, event.timestamp, event.eventId).exec();
// Execute commands separately to avoid CROSSSLOT error in cluster mode
await Promise.all([
this.redisClient.hset(DLQ_HASH_KEY, event.eventId, eventJson),
this.redisClient.zadd(DLQ_ZSET_KEY, event.timestamp, event.eventId),
]);
await this.incrementRateLimit();

@@ -73,4 +77,8 @@ logger_1.PUBLISHER_LOGGER.log(`DLQ: Added event ${event.eventId} to Dead Letter Queue`);

try {
const removed = await this.redisClient.multi().hdel(DLQ_HASH_KEY, eventId).zrem(DLQ_ZSET_KEY, eventId).exec();
if (removed && removed[0][1] === 1 && removed[1][1] === 1) {
// Execute commands separately to avoid CROSSSLOT error in cluster mode
const [hdelResult, zremResult] = await Promise.all([
this.redisClient.hdel(DLQ_HASH_KEY, eventId),
this.redisClient.zrem(DLQ_ZSET_KEY, eventId),
]);
if (hdelResult === 1 && zremResult === 1) {
logger_1.PUBLISHER_LOGGER.log(`DLQ: Successfully removed event ${eventId} from Dead Letter Queue`);

@@ -95,7 +103,7 @@ return true;

if (expiredEventIds.length > 0) {
await this.redisClient
.multi()
.hdel(DLQ_HASH_KEY, ...expiredEventIds)
.zremrangebyscore(DLQ_ZSET_KEY, 0, cutoffTime)
.exec();
// Execute commands separately to avoid CROSSSLOT error in cluster mode
await Promise.all([
this.redisClient.hdel(DLQ_HASH_KEY, ...expiredEventIds),
this.redisClient.zremrangebyscore(DLQ_ZSET_KEY, 0, cutoffTime),
]);
}

@@ -110,12 +118,5 @@ logger_1.PUBLISHER_LOGGER.log(`DLQ: Cleaned up ${expiredEventIds.length} expired events from Dead Letter Queue`);

try {
const results = await this.redisClient.multi().zcard(DLQ_ZSET_KEY).get(DLQ_RATE_LIMIT_KEY).exec();
if (!results) {
throw new Error('Failed to execute Redis commands');
}
const [sizeResult, additionRateResult] = results;
if (sizeResult[0] || additionRateResult[0]) {
throw new Error('Error executing Redis commands');
}
const size = sizeResult[1];
const additionRate = parseInt(additionRateResult[1] || '0', 10);
// Execute commands separately to avoid CROSSSLOT error in cluster mode
const [size, additionRateStr] = await Promise.all([this.redisClient.zcard(DLQ_ZSET_KEY), this.redisClient.get(DLQ_RATE_LIMIT_KEY)]);
const additionRate = parseInt(additionRateStr || '0', 10);
return { size, additionRate };

@@ -133,6 +134,10 @@ }

async incrementRateLimit() {
await this.redisClient.incr(DLQ_RATE_LIMIT_KEY);
await this.redisClient.expire(DLQ_RATE_LIMIT_KEY, 60); // Reset rate limit after 1 minute
// These commands operate on the same key so they can be combined in cluster mode
await this.redisClient
.multi()
.incr(DLQ_RATE_LIMIT_KEY)
.expire(DLQ_RATE_LIMIT_KEY, 60) // Reset rate limit after 1 minute
.exec();
}
}
exports.DeadLetterQueue = DeadLetterQueue;