redis-streams-broker
Advanced tools
Comparing version 0.0.8 to 0.0.9
@@ -1,4 +0,4 @@ | ||
import redisNs from 'ioredis' | ||
export declare class StreamChannelBroker { | ||
constructor(redisClient: redisNs.Redis, channelName: string); | ||
constructor(redisClient: any, channelName: string); | ||
publish(payload: any, maximumApproximateMessages?: number): Promise<string>; | ||
@@ -13,4 +13,4 @@ destroy(): Promise<boolean>; | ||
readFrom: string; | ||
subscribe(consumerName: string, handler: (payload: Payload[]) => Promise<boolean>, pollSpan?: number, payloadsToFetch?: number, subscriptionHandle?: string): Promise<string>; | ||
unsubscribe(subscriptionHandle: string): Promise<string>; | ||
subscribe(consumerName: string, handler: (payload: Payload[]) => Promise<boolean>, pollSpan?: number, payloadsToFetch?: number, subscriptionHandle?: string, readPending?: boolean): Promise<string>; | ||
unsubscribe(subscriptionHandle: string): Promise<boolean>; | ||
pendingSummary(): Promise<GroupSummary>; | ||
@@ -23,3 +23,3 @@ } | ||
payload: any; | ||
markAsRead(dropMessage?: boolean): Promise<boolean>; | ||
markAsRead(deleteMessage?: boolean): Promise<boolean>; | ||
} | ||
@@ -26,0 +26,0 @@ |
34
index.js
const shortid = require("shortid"); | ||
const scripto = require('redis-scripto'); | ||
const path = require('path'); | ||
class StreamChannelBroker { | ||
#scriptingEngine; | ||
@@ -23,12 +21,2 @@ constructor(redisClient, channelName) { | ||
this._groupPendingSummary = this._destroyingCheckWrapper(this._groupPendingSummary.bind(this), false); | ||
this.#scriptingEngine = new scripto(this._redisClient); | ||
this.#scriptingEngine.loadFromDir(path.resolve(path.dirname(__filename), 'lua')); | ||
this.#scriptingEngine.runLuaScriptAsync = async (scriptName, keys, args) => new Promise((resolve, reject) => this.#scriptingEngine.run(scriptName, keys, args, function (err, result) { | ||
if (err != undefined) { | ||
reject(err); | ||
return; | ||
} | ||
resolve(result) | ||
})); | ||
this.#scriptingEngine.runLuaScriptAsync.bind(this.#scriptingEngine); | ||
} | ||
@@ -56,9 +44,13 @@ | ||
async _subscribe(groupName, consumerName, handler, pollSpan = 1000, payloadsToFetch = 2, subscriptionHandle = shortid.generate()) { | ||
async _subscribe(groupName, consumerName, handler, pollSpan = 1000, payloadsToFetch = 2, subscriptionHandle = shortid.generate(), readPending = false) { | ||
const intervalHandle = setTimeout(async () => { | ||
try { | ||
const messages = await this._redisClient.xreadgroup("GROUP", groupName, consumerName, "COUNT", payloadsToFetch, "STREAMS", this._channelName, ">"); | ||
const messages = await this._redisClient.xreadgroup("GROUP", groupName, consumerName, "COUNT", payloadsToFetch, "STREAMS", this._channelName, (readPending === false ? ">" : "0")); | ||
if (messages !== null) { | ||
let streamPayloads = this._transformResponseToMessage(messages, groupName); | ||
await handler(streamPayloads); | ||
if (streamPayloads.length === 0 & readPending === true) {// The server should respond back with zero and not with null response. //Look at usage example https://redis.io/commands/xreadgroup | ||
//This means all pending messages are processed. | ||
readPending = false; | ||
} | ||
} | ||
@@ -68,3 +60,3 @@ } | ||
if (this._destroying === false && this._unsubscribe(subscriptionHandle)) { | ||
await this._subscribe(groupName, consumerName, handler, pollSpan, payloadsToFetch, subscriptionHandle); | ||
await this._subscribe(groupName, consumerName, handler, pollSpan, payloadsToFetch, subscriptionHandle, readPending); | ||
} | ||
@@ -94,2 +86,3 @@ } | ||
result = await this._redisClient.xack(this._channelName, groupName, messageId); | ||
return result === 1; | ||
} else { | ||
@@ -100,4 +93,4 @@ result = await this._redisClient.multi() | ||
.exec(); | ||
return result[0][1] === result[1][1] && result[0][1] === 1; | ||
} | ||
return result === 1; | ||
} | ||
@@ -134,3 +127,10 @@ | ||
async joinConsumerGroup(groupName, readFrom = '$') { | ||
await this.#scriptingEngine.runLuaScriptAsync("create-group", [this._channelName], [groupName, readFrom]); | ||
try { | ||
await this._redisClient.xgroup("CREATE", this._channelName, groupName, readFrom, "MKSTREAM"); | ||
} | ||
catch (err) { | ||
if (!err.message === 'BUSYGROUP Consumer Group name already exists') | ||
throw err; | ||
} | ||
return { | ||
@@ -137,0 +137,0 @@ "name": groupName, |
@@ -0,0 +0,0 @@ This is free and unencumbered software released into the public domain. |
{ | ||
"name": "redis-streams-broker", | ||
"version": "0.0.8", | ||
"version": "0.0.9", | ||
"description": "This package is a broker to redis stream data type, This package provides guaranteed message delivery feature with acknowledgement.", | ||
@@ -8,3 +8,3 @@ "main": "index.js", | ||
"mocha": "mocha", | ||
"test": "npm run redisstart && npm run mocha && npm run redisstop", | ||
"test": "npm run redisstart && npm run mocha && npm run redisstop && npm run redisstart && env REDISCLIENT=1 npm run mocha && npm run redisstop", | ||
"redisstop": "docker stop streamz", | ||
@@ -19,2 +19,3 @@ "redisstart": "npm run redisstop || node -v && docker run --name streamz -p 6379:6379 -itd --rm redis:latest" | ||
"redis", | ||
"stream", | ||
"streams", | ||
@@ -34,10 +35,9 @@ "message", | ||
"dependencies": { | ||
"@types/ioredis": "^4.17.6", | ||
"ioredis": "^4.16.2", | ||
"redis-scripto": "^0.1.3", | ||
"shortid": "^2.2.15" | ||
}, | ||
"devDependencies": { | ||
"mocha": "^7.1.1" | ||
"ioredis": "^4.26.0", | ||
"mocha": "^7.1.1", | ||
"redis": "^3.1.2" | ||
} | ||
} |
112
readme.md
# redis-streams-broker | ||
This simple package is based on redis streams data type which provides you with following features | ||
1. Centralized Que. (Using Redis) | ||
2. Guarantee of message delivery via consumer acknowledgements. | ||
3. Consumer Group functionality for scalability. (Just like Kafka) | ||
This package is based on [redis stream](https://github.com/LRagji/redis-streams-broker) data type which provides you with following features | ||
1. Broker to redis stream which can be used as centralized que between microservices. (Using Redis) | ||
2. Support for injectable redis client (be it [ioredis](https://www.npmjs.com/package/ioredis) or [redis](https://www.npmjs.com/package/redis)) | ||
3. Guarantee of message delivery via consumer acknowledgements. | ||
4. Consumer Group functionality for scalability. (Just like Kafka) | ||
5. Option to drop a message when its acked, thus keeping memory footprint in check. | ||
@@ -13,3 +15,3 @@ ## Getting Started | ||
3. Run redis on local docker if required. `docker run --name streamz -p 6379:6379 -itd --rm redis:latest` | ||
3. Instantiate with a redis connection and name for the stream. `const broker = new brokerType(redisConnectionString, name);` | ||
3. Instantiate with a redis client and name for the stream. `const broker = new brokerType(redisClient, name);` | ||
4. All done, Start using it!!. | ||
@@ -19,7 +21,12 @@ | ||
1. Please find example code for injectable ioredis client [here](https://github.com/LRagji/redis-streams-broker/blob/master/examples/ioredis.js) | ||
2. Please find example code for injectable custom client [here](https://github.com/LRagji/redis-streams-broker/blob/master/examples/custom.js) | ||
```javascript | ||
const Redis = require("ioredis"); | ||
const redisConnectionString = "redis://127.0.0.1:6379/"; | ||
const qName = "Queue"; | ||
const redisClient = new Redis(redisConnectionString); | ||
const brokerType = require('redis-streams-broker').StreamChannelBroker; | ||
const broker = new brokerType(redisConnectionString, qName); | ||
const broker = new brokerType(redisClient, qName); | ||
@@ -64,6 +71,4 @@ //Used to publish a paylod on stream. | ||
1. Authors love for Open Source. | ||
2. [IORedis](https://www.npmjs.com/package/ioredis). | ||
3. [shortid](https://www.npmjs.com/package/shortid). | ||
4. [redis-scripto](https://www.npmjs.com/package/redis-scripto). | ||
1. Authors :heart for Open Source. | ||
2. [shortid](https://www.npmjs.com/package/shortid). | ||
@@ -76,3 +81,3 @@ ## Contributions | ||
## Current Version: | ||
0.0.8[Beta] | ||
0.0.9[Beta] | ||
@@ -82,1 +87,86 @@ ## License | ||
This project is contrubution to public domain and completely free for use, view [LICENSE.md](/license.md) file for details. | ||
## API | ||
Class `StreamChannelBroker` | ||
1. `constructor(redisClient: any, channelName: string)` | ||
Creates a broker instance. | ||
*redisClient*: Injectable redis client which will be used to send commands to redis server. | ||
*channelName*: Name of the stream key, if this doesnot exists it will be created on first push or group subscription. | ||
2. `publish(payload: any, maximumApproximateMessages?: number): Promise<string>;` | ||
Publishes provided message into the stream and returns id generated by server. | ||
*payload*: A JS object containing properties which are passed as key values pairs. | ||
*maximumApproximateMessages*: Appropiate length of the stream it is equal to `~ MAXLENGTH` option in redis. Defaulted to 100. | ||
3. `joinConsumerGroup(groupName: string, readFrom: string): Promise<ConsumerGroup>` | ||
Creates a consumer group on the given redis stream with information provided, if the group exists does nothing returning a `ConsumerGroup` object. | ||
*groupName*: Name of the group to be created ot joined. | ||
*readFrom*: Id of the mesage to start reading from. defaulted to `$` to only read new messages recevied on redis, check [redis docs](https://redis.io/commands/xgroup) for more info. | ||
4. `memoryFootprint(): Promise<number>` | ||
Returns number of bytes consumed by the current stream. | ||
5. `destroy(): Promise<boolean>;` | ||
Starts to unsubscribe all the handles that were subscribed to this instance. | ||
Class `ConsumerGroup` | ||
1. `subscribe(consumerName: string, handler: (payload: Payload[]) => Promise<boolean>, pollSpan?: number, payloadsToFetch?: number, subscriptionHandle?: string, readPending?: boolean): Promise<string>` | ||
Subscribes to stream to start receiving events when new payload arrives, this internally creates a polling system to check for new messages in stream. returns subscription name. | ||
*consumerName*: Name of the consumer who is subscribing via the consumer group object. | ||
*handler*: A callback function which will be invoked when new message a.k.a payload(s) arrive. Should be of signature `(payload: Payload[]) => Promise<boolean>` should be async return from this function is ignored for now, look at `Payload` class below for more details. | ||
*pollSpan*: Number of millisecond to wait after completion of handler to check for next available message in stream. Defaulted to 1000 milliseconds. | ||
*payloadsToFetch*: Maximum number of messages to fetch in one poll to server this is simillar to `COUNT` command in redis, this is optional and defaulted to 2. | ||
*subscriptionHandle*: Name for subscription handler this is what will be returned from the function, this is defaulted to unique shortid. | ||
*readPending*: If set to `true` will read all messages from start of the stream ie: Id = 0 which are in pending list of this consumer and group, once all pending are read it will automatically switch to latest messages from the stream. If set to `false` it will always look for new message from the stream, this is defaulted to `false`. | ||
2. `unsubscribe(subscriptionHandle: string): Promise<boolean>` | ||
Unsubscribes from the stream for the given subscriptionhandle, returns true for sucess and false for failure. | ||
*subscriptionHandle*: Name of the subscription handle which was returned by subscribe api. | ||
3. `pendingSummary(): Promise<GroupSummary>` | ||
Returns details of the pending items for the given group by exposing `GroupSummary` object. | ||
Class `Payload` | ||
1. `channel: string`: Name of the stream key in redis. | ||
2. `id: string`: Id of the message being received. | ||
3. `payload: any`: Actual payload to processs. | ||
4. `markAsRead(deleteMessage?: boolean): Promise<boolean>` | ||
This function helps to ack the payload as read or processed, returns status of the operation via boolean return type `true` indicating success. | ||
*deleteMessage*: if set to `true` it will ack & delete the message from the stream if set to `false` will only ack the message defaulted to false. | ||
Class `GroupSummary` | ||
1. `total: number`: This is the total number of messages in pending list. | ||
2. `firstId: string`: Id of the first message which is pending. | ||
3. `lastId: string`: Id of the last message which is pending. | ||
4. `consumerStats: any`: Extra information provided by `XPENDING` command. | ||
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
19138
1
168
3
5
188
- Removed@types/ioredis@^4.17.6
- Removedioredis@^4.16.2
- Removedredis-scripto@^0.1.3
- Removed@types/ioredis@4.28.10(transitive)
- Removed@types/node@22.9.1(transitive)
- Removedcluster-key-slot@1.1.2(transitive)
- Removeddebug@0.7.44.3.7(transitive)
- Removeddenque@1.5.1(transitive)
- Removedioredis@4.28.5(transitive)
- Removedlodash.defaults@4.2.0(transitive)
- Removedlodash.flatten@4.4.0(transitive)
- Removedlodash.isarguments@3.1.0(transitive)
- Removedms@2.1.3(transitive)
- Removedp-map@2.1.0(transitive)
- Removedredis@0.8.6(transitive)
- Removedredis-commands@1.7.0(transitive)
- Removedredis-errors@1.2.0(transitive)
- Removedredis-parser@3.0.0(transitive)
- Removedredis-scripto@0.1.3(transitive)
- Removedstandard-as-callback@2.1.0(transitive)
- Removedundici-types@6.19.8(transitive)