redis-streams-broker
Advanced tools
Comparing version 0.0.9 to 0.0.10
18
index.js
@@ -46,3 +46,3 @@ const shortid = require("shortid"); | ||
try { | ||
const messages = await this._redisClient.xreadgroup("GROUP", groupName, consumerName, "COUNT", payloadsToFetch, "STREAMS", this._channelName, (readPending === false ? ">" : "0")); | ||
const messages = await this._redisClient.xreadgroup("GROUP", groupName, consumerName, "COUNT", payloadsToFetch, "BLOCK", pollSpan, "STREAMS", this._channelName, (readPending === false ? ">" : "0")); | ||
if (messages !== null) { | ||
@@ -62,3 +62,3 @@ let streamPayloads = this._transformResponseToMessage(messages, groupName); | ||
} | ||
}, pollSpan); | ||
}, 0); | ||
let subscriptions = this._activeSubscriptions.get(subscriptionHandle) || []; | ||
@@ -103,6 +103,14 @@ subscriptions.push(intervalHandle); | ||
payload["markAsRead"] = async (dropMessage) => await this._acknowledgeMessage(groupName, messageId, dropMessage); | ||
for (let propertyIdx = 0; propertyIdx < responses[responseIdx][1][messageIdIdx][1].length;) { | ||
payload.payload[responses[responseIdx][1][messageIdIdx][1][propertyIdx]] = responses[responseIdx][1][messageIdIdx][1][propertyIdx + 1]; | ||
propertyIdx += 2; | ||
payload["markAsRead"] = async (dropMessage) => await this._acknowledgeMessage(groupName, messageId, dropMessage); | ||
if (responses[responseIdx][1][messageIdIdx][1] == null) { | ||
//This happens when actual message is rolled over but its still in pending list of the consumer. | ||
//Or someone deleted the message from Redis while it was still pending. | ||
payload.payload = null; | ||
} | ||
else { | ||
for (let propertyIdx = 0; propertyIdx < responses[responseIdx][1][messageIdIdx][1].length;) { | ||
payload.payload[responses[responseIdx][1][messageIdIdx][1][propertyIdx]] = responses[responseIdx][1][messageIdIdx][1][propertyIdx + 1]; | ||
propertyIdx += 2; | ||
} | ||
} | ||
payloads.push(payload); | ||
@@ -109,0 +117,0 @@ } |
{ | ||
"name": "redis-streams-broker", | ||
"version": "0.0.9", | ||
"version": "0.0.10", | ||
"description": "This package is a broker to redis stream data type, This package provides guaranteed message delivery feature with acknowledgement.", | ||
@@ -10,3 +10,4 @@ "main": "index.js", | ||
"redisstop": "docker stop streamz", | ||
"redisstart": "npm run redisstop || node -v && docker run --name streamz -p 6379:6379 -itd --rm redis:latest" | ||
"redisstart": "npm run redisstop || node -v && docker run --name streamz -p 6379:6379 -itd --rm redis:latest", | ||
"rediscli": "docker exec -it streamz redis-cli" | ||
}, | ||
@@ -13,0 +14,0 @@ "repository": { |
@@ -22,2 +22,3 @@ # redis-streams-broker | ||
2. Please find example code for injectable custom client [here](https://github.com/LRagji/redis-streams-broker/blob/master/examples/custom.js) | ||
3. Please find multi threading examples [here](https://github.com/LRagji/redis-streams-broker/tree/master/examples/H-Scalling%20idempotent%20operation) | ||
@@ -42,6 +43,6 @@ ```javascript | ||
// Handler for arriving Payload | ||
async function newMessageHandler(payload) { | ||
for (let index = 0; index < payload.length; index++) { | ||
async function newMessageHandler(payloads) { | ||
for (let index = 0; index < payloads.length; index++) { | ||
try { | ||
const element = payload[index]; | ||
const element = payloads[index]; | ||
console.log("Payload Id:", element.id); //Payload Id | ||
@@ -80,3 +81,3 @@ console.log("Payload Received from :", element.channel); //Stream name | ||
## Current Version: | ||
0.0.9[Beta] | ||
0.0.10[Beta] | ||
@@ -83,0 +84,0 @@ ## License |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
19892
196
169