redis-streams-broker
Advanced tools
Comparing version 0.0.6 to 0.0.7
@@ -0,7 +1,8 @@ | ||
import redisNs from '@types/ioredis' | ||
export declare class StreamChannelBroker { | ||
constructor(redisConnectionString: string, channelName: string); | ||
constructor(redisClient: redisNs.Redis, channelName: string); | ||
publish(payload: any, maximumApproximateMessages?: number): Promise<string>; | ||
destroy(): Promise<boolean>; | ||
joinConsumerGroup(groupName: string, readFrom: string): Promise<ConsumerGroup>; | ||
memoryFootprint():Promise<integer>; | ||
memoryFootprint(): Promise<integer>; | ||
} | ||
@@ -21,3 +22,3 @@ | ||
payload: any; | ||
markAsRead(): Promise<boolean>; | ||
markAsRead(dropMessage?: boolean): Promise<boolean>; | ||
} | ||
@@ -24,0 +25,0 @@ |
57
index.js
@@ -1,8 +0,10 @@ | ||
const redisType = require("ioredis"); | ||
const shortid = require("shortid"); | ||
const scripto = require('redis-scripto'); | ||
const path = require('path'); | ||
class StreamChannelBroker { | ||
#scriptingEngine; | ||
constructor(redisConnectionString, channelName) { | ||
this._redisClient = new redisType(redisConnectionString); | ||
constructor(redisClient, channelName) { | ||
this._redisClient = redisClient; | ||
this._destroying = false; | ||
@@ -21,2 +23,12 @@ this._channelName = channelName; | ||
this._groupPendingSummary = this._destroyingCheckWrapper(this._groupPendingSummary.bind(this), false); | ||
this.#scriptingEngine = new scripto(this._redisClient); | ||
this.#scriptingEngine.loadFromDir(path.resolve(path.dirname(__filename), 'lua')); | ||
this.#scriptingEngine.runLuaScriptAsync = async (scriptName, keys, args) => new Promise((resolve, reject) => this.#scriptingEngine.run(scriptName, keys, args, function (err, result) { | ||
if (err != undefined) { | ||
reject(err); | ||
return; | ||
} | ||
resolve(result) | ||
})); | ||
this.#scriptingEngine.runLuaScriptAsync.bind(this.#scriptingEngine); | ||
} | ||
@@ -73,4 +85,12 @@ | ||
async _acknowledgeMessage(groupName, messageId) { | ||
let result = await this._redisClient.xack(this._channelName, groupName, messageId); | ||
async _acknowledgeMessage(groupName, messageId, dropMessage = false) { | ||
let result; | ||
if (dropMessage === false) { | ||
result = await this._redisClient.xack(this._channelName, groupName, messageId); | ||
} else { | ||
result = await this._redisClient.multi() | ||
.xack(this._channelName, groupName, messageId) | ||
.xdel(this._channelName, messageId) | ||
.exec(); | ||
} | ||
return result === 1; | ||
@@ -85,3 +105,4 @@ } | ||
let messageId = responses[responseIdx][1][messageIdIdx][0]; | ||
let payload = { "channel": streamName, "id": messageId, "markAsRead": async () => await this._acknowledgeMessage(groupName, messageId), payload: {} }; | ||
let payload = { "channel": streamName, "id": messageId, payload: {} }; | ||
payload["markAsRead"] = async (dropMessage) => await this._acknowledgeMessage(groupName, messageId, dropMessage); | ||
for (let propertyIdx = 0; propertyIdx < responses[responseIdx][1][messageIdIdx][1].length;) { | ||
@@ -108,13 +129,3 @@ payload.payload[responses[responseIdx][1][messageIdIdx][1][propertyIdx]] = responses[responseIdx][1][messageIdIdx][1][propertyIdx + 1]; | ||
async joinConsumerGroup(groupName, readFrom = '$') { | ||
const keyExists = await this._redisClient.exists(this._channelName); | ||
if (keyExists === 1) { | ||
const existingGroups = await this._redisClient.xinfo("GROUPS", this._channelName); | ||
if (existingGroups.find(e => e[1] === groupName) === undefined) { | ||
await this._redisClient.xgroup("CREATE", this._channelName, groupName, readFrom); | ||
} | ||
} | ||
else { | ||
await this._redisClient.xgroup("CREATE", this._channelName, groupName, readFrom, "MKSTREAM"); | ||
} | ||
await this.#scriptingEngine.runLuaScriptAsync("create-group", [this._channelName], [groupName, readFrom]); | ||
return { | ||
@@ -151,8 +162,12 @@ "name": groupName, | ||
} | ||
return await this._redisClient.xadd(this._channelName, 'MAXLEN', '~', maximumApproximateMessages, '*', ...keyValuePairs); | ||
if (maximumApproximateMessages < 0) { | ||
return await this._redisClient.xadd(this._channelName, '*', ...keyValuePairs); | ||
} | ||
else { | ||
return await this._redisClient.xadd(this._channelName, 'MAXLEN', '~', maximumApproximateMessages, '*', ...keyValuePairs); | ||
} | ||
} | ||
async memoryFootprint() { | ||
return await this._redisClient.memory("usage", this._channelName,"samples",0); | ||
return await this._redisClient.memory("usage", this._channelName, "samples", 0); | ||
} | ||
@@ -163,4 +178,2 @@ | ||
let result = Array.from(this._activeSubscriptions.keys).reduce(((pre, handle) => this._unsubscribe(handle) & pre), true); | ||
await this._redisClient.quit(); | ||
await this._redisClient.disconnect(); | ||
return result; | ||
@@ -167,0 +180,0 @@ } |
{ | ||
"name": "redis-streams-broker", | ||
"version": "0.0.6", | ||
"version": "0.0.7", | ||
"description": "This package is a broker to redis stream data type, This package provides guaranteed message delivery feature with acknowledgement.", | ||
@@ -26,3 +26,3 @@ "main": "index.js", | ||
"author": "Laukik", | ||
"license": "SEE LICENSE IN <license.md>", | ||
"license": "SEE LICENSE IN license.md", | ||
"bugs": { | ||
@@ -33,3 +33,5 @@ "url": "https://github.com/LRagji/redis-streams-broker/issues" | ||
"dependencies": { | ||
"@types/ioredis": "^4.17.6", | ||
"ioredis": "^4.16.2", | ||
"redis-scripto": "^0.1.3", | ||
"shortid": "^2.2.15" | ||
@@ -36,0 +38,0 @@ }, |
# redis-streams-broker | ||
This simple package is based on redis streams data type which provides you with following features | ||
1. Centeralized Que. (Using Redis) | ||
1. Centralized Que. (Using Redis) | ||
2. Guarantee of message delivery via consumer acknowledgements. | ||
@@ -24,20 +24,35 @@ 3. Consumer Group functionality for scalability. (Just like Kafka) | ||
const payloadId = await broker.publish({ a: "Hello", b: "World" }); //Used to publish a paylod on stream. | ||
//Used to publish a paylod on stream. | ||
const payloadId = await broker.publish({ a: "Hello", b: "World" }); | ||
const consumerGroup = await broker.joinConsumerGroup("MyGroup"); //Creates a consumer group to receive payload | ||
//Creates a consumer group to receive payload | ||
const consumerGroup = await broker.joinConsumerGroup("MyGroup"); | ||
const subscriptionHandle = await consumerGroup.subscribe("Consumer1", newMessageHandler); //Registers a new consumer with Name and Callback for message handlling. | ||
//Registers a new consumer with Name and Callback for message handlling. | ||
const subscriptionHandle = await consumerGroup.subscribe("Consumer1", newMessageHandler); | ||
// Handler for arriving Payload | ||
async function newMessageHandler(payload) { | ||
console.log("Payload Id:", payload.id); //Payload Id | ||
console.log("Payload Received from :", payload.channel); //Stream name | ||
console.log("Actual Payload:", payload.payload); //Actual Payload | ||
await payload.markAsRead(); //Payload is marked as delivered or Acked. | ||
for (let index = 0; index < payload.length; index++) { | ||
try { | ||
const element = payload[index]; | ||
console.log("Payload Id:", element.id); //Payload Id | ||
console.log("Payload Received from :", element.channel); //Stream name | ||
console.log("Actual Payload:", element.payload); //Actual Payload | ||
await element.markAsRead(); //Payload is marked as delivered or Acked also optionaly the message can be dropped. | ||
} | ||
catch (exception) { | ||
console.error(exception); | ||
} | ||
} | ||
} | ||
const summary = await consumerGroup.pendingSummary(); //Provides summary of payloads which have delivered but not acked yet. | ||
//Provides summary of payloads which have delivered but not acked yet. | ||
const summary = await consumerGroup.pendingSummary(); | ||
const sucess = consumerGroup.unsubscribe(subscriptionHandle); //Unsubscribes the consumer from the group. | ||
//Unsubscribes the consumer from the group. | ||
const sucess = consumerGroup.unsubscribe(subscriptionHandle); | ||
const consumedMem = await broker.memoryFootprint(); //Amount of memory consumed by this stream in bytes. | ||
//Amount of memory consumed by this stream in bytes. | ||
const consumedMem = await broker.memoryFootprint(); | ||
@@ -50,2 +65,4 @@ ``` | ||
2. [IORedis](https://www.npmjs.com/package/ioredis). | ||
3. [shortid](https://www.npmjs.com/package/shortid). | ||
4. [redis-scripto](https://www.npmjs.com/package/redis-scripto). | ||
@@ -58,3 +75,3 @@ ## Contributions | ||
## Current Version: | ||
0.0.5[Beta] | ||
0.0.7[Beta] | ||
@@ -61,0 +78,0 @@ ## License |
const assert = require('assert'); | ||
const utils = require('./test-utils'); | ||
const localRedisConnectionString = "redis://127.0.0.1:6379/"; | ||
const redisType = require("ioredis"); | ||
const redisClient = new redisType(localRedisConnectionString); | ||
const targetType = require('../index').StreamChannelBroker; | ||
@@ -11,6 +13,8 @@ const channelName = "Channel1"; | ||
this.beforeAll(async function () { | ||
target = new targetType(localRedisConnectionString, channelName); | ||
target = new targetType(redisClient, channelName); | ||
}); | ||
this.afterAll(async function () { | ||
await target.destroy(); | ||
await redisClient.quit(); | ||
await redisClient.disconnect(); | ||
}); | ||
@@ -17,0 +21,0 @@ this.beforeEach(async function () { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
27992
9
402
78
4
+ Added@types/ioredis@^4.17.6
+ Addedredis-scripto@^0.1.3
+ Added@types/ioredis@4.28.10(transitive)
+ Added@types/node@22.9.3(transitive)
+ Addeddebug@0.7.4(transitive)
+ Addedredis@0.8.6(transitive)
+ Addedredis-scripto@0.1.3(transitive)
+ Addedundici-types@6.19.8(transitive)