@jetit/publisher
Advanced tools
Comparing version
{ | ||
"name": "@jetit/publisher", | ||
"version": "5.4.0", | ||
"version": "5.4.1", | ||
"type": "commonjs", | ||
@@ -5,0 +5,0 @@ "dependencies": { |
@@ -304,3 +304,3 @@ "use strict"; | ||
// Making the subscription Immutable | ||
const subscription = Object.freeze({ | ||
const subscription = { | ||
subject: bs, | ||
@@ -310,3 +310,3 @@ filter: eventFilter, | ||
keepAlive: filterKeepAlive, | ||
}); | ||
}; | ||
eventSubscriptions.set(subscriptionId, subscription); | ||
@@ -723,7 +723,11 @@ // Return early if not first subscription | ||
async acknowledgeMessage(ackKey) { | ||
const { streamName, messageId } = this.demergeMessageKey(ackKey); | ||
let { streamName, messageId } = this.demergeMessageKey(ackKey); | ||
const lastAckKey = `last_ack:${streamName}`; | ||
messageId == '0' ? '0-0' : messageId; | ||
try { | ||
// Update last acknowledged ID and acknowledge message atomically | ||
await Promise.all([this.redisGroups.xack(streamName, this.consumerGroupName, messageId), this.redisGroups.set(lastAckKey, messageId)]); | ||
await Promise.all([ | ||
this.redisGroups.xack(streamName, this.consumerGroupName, messageId), | ||
this.redisGroups.set(lastAckKey, messageId.toString()), | ||
]); | ||
} | ||
@@ -730,0 +734,0 @@ catch (error) { |
118661
0.07%2467
0.16%