@jetit/publisher
Advanced tools
Comparing version
{ | ||
"name": "@jetit/publisher", | ||
"version": "5.4.1", | ||
"version": "5.5.1", | ||
"type": "commonjs", | ||
@@ -5,0 +5,0 @@ "dependencies": { |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.MetricsCollector = void 0; | ||
const logger_1 = require("../redis/logger"); | ||
class MetricsCollector { | ||
@@ -119,9 +120,24 @@ constructor(config, dlq) { | ||
try { | ||
// Verify if the key is a stream before proceeding | ||
const keyType = await this.redisClient.type(streamKey); | ||
if (keyType !== 'stream') { | ||
continue; // Skip non-stream keys | ||
} | ||
// Get stream length and pending info | ||
const streamLength = await this.redisClient.xlen(streamKey); | ||
// Extract consumer group name from stream key (format: eventName:cg-serviceName) | ||
const consumerGroup = streamKey.split(':')[1]; | ||
const [eventName, consumerGroup] = streamKey.split(':'); | ||
if (!consumerGroup?.startsWith('cg-')) { | ||
continue; // Skip if key format doesn't match expected pattern | ||
} | ||
// XPENDING returns [count, min-id, max-id, consumer-list] | ||
const pendingInfo = await this.redisClient.xpending(streamKey, consumerGroup); | ||
const totalPending = pendingInfo ? Number(pendingInfo[0]) : 0; | ||
let totalPending = 0; | ||
try { | ||
const pendingInfo = await this.redisClient.xpending(streamKey, consumerGroup); | ||
totalPending = pendingInfo ? Number(pendingInfo[0]) : 0; | ||
} | ||
catch (error) { | ||
logger_1.PUBLISHER_LOGGER.error(`Error getting pending info for stream ${streamKey}:`, error); | ||
// Continue with totalPending as 0 if XPENDING fails | ||
} | ||
const queueDepth = Math.max(0, streamLength - totalPending); | ||
@@ -132,3 +148,3 @@ totalDepth += queueDepth; | ||
catch (error) { | ||
console.error(`Error processing key ${streamKey}:`, error); | ||
logger_1.PUBLISHER_LOGGER.error(`Error processing key ${streamKey}:`, error); | ||
continue; | ||
@@ -135,0 +151,0 @@ } |
@@ -21,3 +21,7 @@ "use strict"; | ||
const eventJson = JSON.stringify(event); | ||
await this.redisClient.multi().hset(DLQ_HASH_KEY, event.eventId, eventJson).zadd(DLQ_ZSET_KEY, event.timestamp, event.eventId).exec(); | ||
// Execute commands separately to avoid CROSSSLOT error in cluster mode | ||
await Promise.all([ | ||
this.redisClient.hset(DLQ_HASH_KEY, event.eventId, eventJson), | ||
this.redisClient.zadd(DLQ_ZSET_KEY, event.timestamp, event.eventId), | ||
]); | ||
await this.incrementRateLimit(); | ||
@@ -73,4 +77,8 @@ logger_1.PUBLISHER_LOGGER.log(`DLQ: Added event ${event.eventId} to Dead Letter Queue`); | ||
try { | ||
const removed = await this.redisClient.multi().hdel(DLQ_HASH_KEY, eventId).zrem(DLQ_ZSET_KEY, eventId).exec(); | ||
if (removed && removed[0][1] === 1 && removed[1][1] === 1) { | ||
// Execute commands separately to avoid CROSSSLOT error in cluster mode | ||
const [hdelResult, zremResult] = await Promise.all([ | ||
this.redisClient.hdel(DLQ_HASH_KEY, eventId), | ||
this.redisClient.zrem(DLQ_ZSET_KEY, eventId), | ||
]); | ||
if (hdelResult === 1 && zremResult === 1) { | ||
logger_1.PUBLISHER_LOGGER.log(`DLQ: Successfully removed event ${eventId} from Dead Letter Queue`); | ||
@@ -95,7 +103,7 @@ return true; | ||
if (expiredEventIds.length > 0) { | ||
await this.redisClient | ||
.multi() | ||
.hdel(DLQ_HASH_KEY, ...expiredEventIds) | ||
.zremrangebyscore(DLQ_ZSET_KEY, 0, cutoffTime) | ||
.exec(); | ||
// Execute commands separately to avoid CROSSSLOT error in cluster mode | ||
await Promise.all([ | ||
this.redisClient.hdel(DLQ_HASH_KEY, ...expiredEventIds), | ||
this.redisClient.zremrangebyscore(DLQ_ZSET_KEY, 0, cutoffTime), | ||
]); | ||
} | ||
@@ -110,12 +118,5 @@ logger_1.PUBLISHER_LOGGER.log(`DLQ: Cleaned up ${expiredEventIds.length} expired events from Dead Letter Queue`); | ||
try { | ||
const results = await this.redisClient.multi().zcard(DLQ_ZSET_KEY).get(DLQ_RATE_LIMIT_KEY).exec(); | ||
if (!results) { | ||
throw new Error('Failed to execute Redis commands'); | ||
} | ||
const [sizeResult, additionRateResult] = results; | ||
if (sizeResult[0] || additionRateResult[0]) { | ||
throw new Error('Error executing Redis commands'); | ||
} | ||
const size = sizeResult[1]; | ||
const additionRate = parseInt(additionRateResult[1] || '0', 10); | ||
// Execute commands separately to avoid CROSSSLOT error in cluster mode | ||
const [size, additionRateStr] = await Promise.all([this.redisClient.zcard(DLQ_ZSET_KEY), this.redisClient.get(DLQ_RATE_LIMIT_KEY)]); | ||
const additionRate = parseInt(additionRateStr || '0', 10); | ||
return { size, additionRate }; | ||
@@ -133,6 +134,10 @@ } | ||
async incrementRateLimit() { | ||
await this.redisClient.incr(DLQ_RATE_LIMIT_KEY); | ||
await this.redisClient.expire(DLQ_RATE_LIMIT_KEY, 60); // Reset rate limit after 1 minute | ||
// These commands operate on the same key so they can be combined in cluster mode | ||
await this.redisClient | ||
.multi() | ||
.incr(DLQ_RATE_LIMIT_KEY) | ||
.expire(DLQ_RATE_LIMIT_KEY, 60) // Reset rate limit after 1 minute | ||
.exec(); | ||
} | ||
} | ||
exports.DeadLetterQueue = DeadLetterQueue; |
119849
1%2488
0.85%