Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

redis-streams-broker

Package Overview
Dependencies
Maintainers
1
Versions
15
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

redis-streams-broker - npm Package Compare versions

Comparing version 0.0.9 to 0.0.10

18

index.js

@@ -46,3 +46,3 @@ const shortid = require("shortid");

try {
const messages = await this._redisClient.xreadgroup("GROUP", groupName, consumerName, "COUNT", payloadsToFetch, "STREAMS", this._channelName, (readPending === false ? ">" : "0"));
const messages = await this._redisClient.xreadgroup("GROUP", groupName, consumerName, "COUNT", payloadsToFetch, "BLOCK", pollSpan, "STREAMS", this._channelName, (readPending === false ? ">" : "0"));
if (messages !== null) {

@@ -62,3 +62,3 @@ let streamPayloads = this._transformResponseToMessage(messages, groupName);

}
}, pollSpan);
}, 0);
let subscriptions = this._activeSubscriptions.get(subscriptionHandle) || [];

@@ -103,6 +103,14 @@ subscriptions.push(intervalHandle);

payload["markAsRead"] = async (dropMessage) => await this._acknowledgeMessage(groupName, messageId, dropMessage);
for (let propertyIdx = 0; propertyIdx < responses[responseIdx][1][messageIdIdx][1].length;) {
payload.payload[responses[responseIdx][1][messageIdIdx][1][propertyIdx]] = responses[responseIdx][1][messageIdIdx][1][propertyIdx + 1];
propertyIdx += 2;
payload["markAsRead"] = async (dropMessage) => await this._acknowledgeMessage(groupName, messageId, dropMessage);
if (responses[responseIdx][1][messageIdIdx][1] == null) {
//This happens when actual message is rolled over but its still in pending list of the consumer.
//Or someone deleted the message from Redis while it was still pending.
payload.payload = null;
}
else {
for (let propertyIdx = 0; propertyIdx < responses[responseIdx][1][messageIdIdx][1].length;) {
payload.payload[responses[responseIdx][1][messageIdIdx][1][propertyIdx]] = responses[responseIdx][1][messageIdIdx][1][propertyIdx + 1];
propertyIdx += 2;
}
}
payloads.push(payload);

@@ -109,0 +117,0 @@ }

{
"name": "redis-streams-broker",
"version": "0.0.9",
"version": "0.0.10",
"description": "This package is a broker to redis stream data type, This package provides guaranteed message delivery feature with acknowledgement.",

@@ -10,3 +10,4 @@ "main": "index.js",

"redisstop": "docker stop streamz",
"redisstart": "npm run redisstop || node -v && docker run --name streamz -p 6379:6379 -itd --rm redis:latest"
"redisstart": "npm run redisstop || node -v && docker run --name streamz -p 6379:6379 -itd --rm redis:latest",
"rediscli": "docker exec -it streamz redis-cli"
},

@@ -13,0 +14,0 @@ "repository": {

@@ -22,2 +22,3 @@ # redis-streams-broker

2. Please find example code for injectable custom client [here](https://github.com/LRagji/redis-streams-broker/blob/master/examples/custom.js)
3. Please find multi threading examples [here](https://github.com/LRagji/redis-streams-broker/tree/master/examples/H-Scalling%20idempotent%20operation)

@@ -42,6 +43,6 @@ ```javascript

// Handler for arriving Payload
async function newMessageHandler(payload) {
for (let index = 0; index < payload.length; index++) {
async function newMessageHandler(payloads) {
for (let index = 0; index < payloads.length; index++) {
try {
const element = payload[index];
const element = payloads[index];
console.log("Payload Id:", element.id); //Payload Id

@@ -80,3 +81,3 @@ console.log("Payload Received from :", element.channel); //Stream name

## Current Version:
0.0.9[Beta]
0.0.10[Beta]

@@ -83,0 +84,0 @@ ## License

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc