@jetit/publisher
Advanced tools
Comparing version
{ | ||
"name": "@jetit/publisher", | ||
"version": "4.0.0", | ||
"version": "4.1.0", | ||
"type": "commonjs", | ||
@@ -5,0 +5,0 @@ "dependencies": { |
@@ -124,12 +124,23 @@ import { Observable } from 'rxjs'; | ||
private cleanupAcknowledgedMessages; | ||
/** | ||
* This function should be added to Surf Signal to publish periodic diagnostic information | ||
* on the health of the stream | ||
*/ | ||
getUnacknowledgedMessagesForStream(eventName: string): Promise<{ | ||
count: number; | ||
countOnThisConsumer?: number | undefined; | ||
messageIds: string[]; | ||
messages?: unknown[] | undefined; | ||
getDiagnosticData(events: string[]): Promise<{ | ||
status: string; | ||
message: string; | ||
data?: undefined; | ||
} | { | ||
status: string; | ||
data: { | ||
eventName: string; | ||
consumerGroupMap: { | ||
consumerGroup: string; | ||
diagnostics: Generator<Promise<unknown[]> | { | ||
count: number; | ||
consumers: { | ||
consumerName: string; | ||
pendingCount: string; | ||
}[]; | ||
}, void, [number, string, string, string[][]]>; | ||
}[]; | ||
}[]; | ||
message: string; | ||
}>; | ||
} |
@@ -320,12 +320,19 @@ "use strict"; | ||
} | ||
/** | ||
* This function should be added to Surf Signal to publish periodic diagnostic information | ||
* on the health of the stream | ||
*/ | ||
async getUnacknowledgedMessagesForStream(eventName) { | ||
const streamName = `${eventName}:${this.consumerGroupName}`; | ||
const unprocessedMessageIds = await (0, utils_1.getUnacknowledgedMessages)(this.redisGroups, this.consumerGroupName, streamName, this.instanceId); | ||
return unprocessedMessageIds; | ||
async getDiagnosticData(events) { | ||
if (events.length > 100) { | ||
return { status: 'ERROR', message: 'Please pass in a maximum of 100 elements to fetch diagnostics' }; | ||
} | ||
const tempPromises = events.map(async (eventName) => { | ||
const consumerGroups = await (0, utils_1.getAllConsumerGroups)(eventName, this.redisPublisher); | ||
const consumerGroupMap = await Promise.all(consumerGroups.map(async (consumerGroup) => { | ||
const streamName = `${eventName}:${consumerGroup}`; | ||
const diagnostics = await (0, utils_1.getSummaryOnStreamConsumerGroup)(this.redisGroups, consumerGroup, streamName); | ||
return { consumerGroup, diagnostics }; | ||
})); | ||
return { eventName, consumerGroupMap }; | ||
}); | ||
const returnData = await await Promise.all(tempPromises); | ||
return { status: 'SUCCESS', data: returnData, message: 'We recommend not running this in times of heavy load' }; | ||
} | ||
} | ||
exports.Streams = Streams; |
import { RedisType } from './registry'; | ||
import { EventData } from './types'; | ||
export declare function getAllConsumerGroups(eventName: string, redisConnection: RedisType): Promise<string[]>; | ||
export declare function getSummaryOnStreamConsumerGroup(redisClient: RedisType, consumerGroupName: string, streamName: string): Generator<Promise<unknown[]> | { | ||
count: number; | ||
consumers: { | ||
consumerName: string; | ||
pendingCount: string; | ||
}[]; | ||
}, void, [number, string, string, string[][]]>; | ||
export declare function getUnacknowledgedMessages(redisClient: RedisType, consumerGroupName: string, streamName: string, consumerName: string, count?: number): Promise<{ | ||
@@ -5,0 +12,0 @@ count: number; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.UTILS = exports.decodeScheduledMessage = exports.encodeScheduledMessage = exports.removedScheduledJob = exports.notifySubscribers = exports.getMessageStatesCount = exports.getUnacknowledgedMessages = exports.getAllConsumerGroups = void 0; | ||
exports.UTILS = exports.decodeScheduledMessage = exports.encodeScheduledMessage = exports.removedScheduledJob = exports.notifySubscribers = exports.getMessageStatesCount = exports.getUnacknowledgedMessages = exports.getSummaryOnStreamConsumerGroup = exports.getAllConsumerGroups = void 0; | ||
const logger_1 = require("./logger"); | ||
@@ -10,2 +10,17 @@ async function getAllConsumerGroups(eventName, redisConnection) { | ||
exports.getAllConsumerGroups = getAllConsumerGroups; | ||
function* getSummaryOnStreamConsumerGroup(redisClient, consumerGroupName, streamName) { | ||
const [count, , , consumers] = (yield redisClient.xpending(streamName, consumerGroupName)); | ||
yield { | ||
count, | ||
consumers: consumers | ||
? consumers.map((x) => { | ||
return { | ||
consumerName: x[0], | ||
pendingCount: x[1], | ||
}; | ||
}) | ||
: [], | ||
}; | ||
} | ||
exports.getSummaryOnStreamConsumerGroup = getSummaryOnStreamConsumerGroup; | ||
async function getUnacknowledgedMessages(redisClient, consumerGroupName, streamName, consumerName, count = 500) { | ||
@@ -12,0 +27,0 @@ try { |
52258
3.33%915
4.69%