Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

redis-streams-broker

Package Overview
Dependencies
Maintainers
1
Versions
15
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

redis-streams-broker - npm Package Compare versions

Comparing version 0.0.6 to 0.0.7

lua/create-group.lua

7

index.d.ts

@@ -0,7 +1,8 @@

import redisNs from '@types/ioredis'
export declare class StreamChannelBroker {
constructor(redisConnectionString: string, channelName: string);
constructor(redisClient: redisNs.Redis, channelName: string);
publish(payload: any, maximumApproximateMessages?: number): Promise<string>;
destroy(): Promise<boolean>;
joinConsumerGroup(groupName: string, readFrom: string): Promise<ConsumerGroup>;
memoryFootprint():Promise<integer>;
memoryFootprint(): Promise<integer>;
}

@@ -21,3 +22,3 @@

payload: any;
markAsRead(): Promise<boolean>;
markAsRead(dropMessage?: boolean): Promise<boolean>;
}

@@ -24,0 +25,0 @@

@@ -1,8 +0,10 @@

const redisType = require("ioredis");
const shortid = require("shortid");
const scripto = require('redis-scripto');
const path = require('path');
class StreamChannelBroker {
#scriptingEngine;
constructor(redisConnectionString, channelName) {
this._redisClient = new redisType(redisConnectionString);
constructor(redisClient, channelName) {
this._redisClient = redisClient;
this._destroying = false;

@@ -21,2 +23,12 @@ this._channelName = channelName;

this._groupPendingSummary = this._destroyingCheckWrapper(this._groupPendingSummary.bind(this), false);
this.#scriptingEngine = new scripto(this._redisClient);
this.#scriptingEngine.loadFromDir(path.resolve(path.dirname(__filename), 'lua'));
this.#scriptingEngine.runLuaScriptAsync = async (scriptName, keys, args) => new Promise((resolve, reject) => this.#scriptingEngine.run(scriptName, keys, args, function (err, result) {
if (err != undefined) {
reject(err);
return;
}
resolve(result)
}));
this.#scriptingEngine.runLuaScriptAsync.bind(this.#scriptingEngine);
}

@@ -73,4 +85,12 @@

async _acknowledgeMessage(groupName, messageId) {
let result = await this._redisClient.xack(this._channelName, groupName, messageId);
async _acknowledgeMessage(groupName, messageId, dropMessage = false) {
let result;
if (dropMessage === false) {
result = await this._redisClient.xack(this._channelName, groupName, messageId);
} else {
result = await this._redisClient.multi()
.xack(this._channelName, groupName, messageId)
.xdel(this._channelName, messageId)
.exec();
}
return result === 1;

@@ -85,3 +105,4 @@ }

let messageId = responses[responseIdx][1][messageIdIdx][0];
let payload = { "channel": streamName, "id": messageId, "markAsRead": async () => await this._acknowledgeMessage(groupName, messageId), payload: {} };
let payload = { "channel": streamName, "id": messageId, payload: {} };
payload["markAsRead"] = async (dropMessage) => await this._acknowledgeMessage(groupName, messageId, dropMessage);
for (let propertyIdx = 0; propertyIdx < responses[responseIdx][1][messageIdIdx][1].length;) {

@@ -108,13 +129,3 @@ payload.payload[responses[responseIdx][1][messageIdIdx][1][propertyIdx]] = responses[responseIdx][1][messageIdIdx][1][propertyIdx + 1];

async joinConsumerGroup(groupName, readFrom = '$') {
const keyExists = await this._redisClient.exists(this._channelName);
if (keyExists === 1) {
const existingGroups = await this._redisClient.xinfo("GROUPS", this._channelName);
if (existingGroups.find(e => e[1] === groupName) === undefined) {
await this._redisClient.xgroup("CREATE", this._channelName, groupName, readFrom);
}
}
else {
await this._redisClient.xgroup("CREATE", this._channelName, groupName, readFrom, "MKSTREAM");
}
await this.#scriptingEngine.runLuaScriptAsync("create-group", [this._channelName], [groupName, readFrom]);
return {

@@ -151,8 +162,12 @@ "name": groupName,

}
return await this._redisClient.xadd(this._channelName, 'MAXLEN', '~', maximumApproximateMessages, '*', ...keyValuePairs);
if (maximumApproximateMessages < 0) {
return await this._redisClient.xadd(this._channelName, '*', ...keyValuePairs);
}
else {
return await this._redisClient.xadd(this._channelName, 'MAXLEN', '~', maximumApproximateMessages, '*', ...keyValuePairs);
}
}
async memoryFootprint() {
return await this._redisClient.memory("usage", this._channelName,"samples",0);
return await this._redisClient.memory("usage", this._channelName, "samples", 0);
}

@@ -163,4 +178,2 @@

let result = Array.from(this._activeSubscriptions.keys).reduce(((pre, handle) => this._unsubscribe(handle) & pre), true);
await this._redisClient.quit();
await this._redisClient.disconnect();
return result;

@@ -167,0 +180,0 @@ }

{
"name": "redis-streams-broker",
"version": "0.0.6",
"version": "0.0.7",
"description": "This package is a broker to redis stream data type, This package provides guaranteed message delivery feature with acknowledgement.",

@@ -26,3 +26,3 @@ "main": "index.js",

"author": "Laukik",
"license": "SEE LICENSE IN <license.md>",
"license": "SEE LICENSE IN license.md",
"bugs": {

@@ -33,3 +33,5 @@ "url": "https://github.com/LRagji/redis-streams-broker/issues"

"dependencies": {
"@types/ioredis": "^4.17.6",
"ioredis": "^4.16.2",
"redis-scripto": "^0.1.3",
"shortid": "^2.2.15"

@@ -36,0 +38,0 @@ },

# redis-streams-broker
This simple package is based on redis streams data type which provides you with following features
1. Centeralized Que. (Using Redis)
1. Centralized Que. (Using Redis)
2. Guarantee of message delivery via consumer acknowledgements.

@@ -24,20 +24,35 @@ 3. Consumer Group functionality for scalability. (Just like Kafka)

const payloadId = await broker.publish({ a: "Hello", b: "World" }); //Used to publish a paylod on stream.
//Used to publish a paylod on stream.
const payloadId = await broker.publish({ a: "Hello", b: "World" });
const consumerGroup = await broker.joinConsumerGroup("MyGroup"); //Creates a consumer group to receive payload
//Creates a consumer group to receive payload
const consumerGroup = await broker.joinConsumerGroup("MyGroup");
const subscriptionHandle = await consumerGroup.subscribe("Consumer1", newMessageHandler); //Registers a new consumer with Name and Callback for message handlling.
//Registers a new consumer with Name and Callback for message handlling.
const subscriptionHandle = await consumerGroup.subscribe("Consumer1", newMessageHandler);
// Handler for arriving Payload
async function newMessageHandler(payload) {
console.log("Payload Id:", payload.id); //Payload Id
console.log("Payload Received from :", payload.channel); //Stream name
console.log("Actual Payload:", payload.payload); //Actual Payload
await payload.markAsRead(); //Payload is marked as delivered or Acked.
for (let index = 0; index < payload.length; index++) {
try {
const element = payload[index];
console.log("Payload Id:", element.id); //Payload Id
console.log("Payload Received from :", element.channel); //Stream name
console.log("Actual Payload:", element.payload); //Actual Payload
await element.markAsRead(); //Payload is marked as delivered or Acked also optionaly the message can be dropped.
}
catch (exception) {
console.error(exception);
}
}
}
const summary = await consumerGroup.pendingSummary(); //Provides summary of payloads which have delivered but not acked yet.
//Provides summary of payloads which have delivered but not acked yet.
const summary = await consumerGroup.pendingSummary();
const sucess = consumerGroup.unsubscribe(subscriptionHandle); //Unsubscribes the consumer from the group.
//Unsubscribes the consumer from the group.
const sucess = consumerGroup.unsubscribe(subscriptionHandle);
const consumedMem = await broker.memoryFootprint(); //Amount of memory consumed by this stream in bytes.
//Amount of memory consumed by this stream in bytes.
const consumedMem = await broker.memoryFootprint();

@@ -50,2 +65,4 @@ ```

2. [IORedis](https://www.npmjs.com/package/ioredis).
3. [shortid](https://www.npmjs.com/package/shortid).
4. [redis-scripto](https://www.npmjs.com/package/redis-scripto).

@@ -58,3 +75,3 @@ ## Contributions

## Current Version:
0.0.5[Beta]
0.0.7[Beta]

@@ -61,0 +78,0 @@ ## License

const assert = require('assert');
const utils = require('./test-utils');
const localRedisConnectionString = "redis://127.0.0.1:6379/";
const redisType = require("ioredis");
const redisClient = new redisType(localRedisConnectionString);
const targetType = require('../index').StreamChannelBroker;

@@ -11,6 +13,8 @@ const channelName = "Channel1";

this.beforeAll(async function () {
target = new targetType(localRedisConnectionString, channelName);
target = new targetType(redisClient, channelName);
});
this.afterAll(async function () {
await target.destroy();
await redisClient.quit();
await redisClient.disconnect();
});

@@ -17,0 +21,0 @@ this.beforeEach(async function () {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc