Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

redis-streams-broker

Package Overview
Dependencies
Maintainers
1
Versions
15
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

redis-streams-broker - npm Package Compare versions

Comparing version 0.0.8 to 0.0.9

10

index.d.ts

@@ -1,4 +0,4 @@

import redisNs from 'ioredis'
export declare class StreamChannelBroker {
constructor(redisClient: redisNs.Redis, channelName: string);
constructor(redisClient: any, channelName: string);
publish(payload: any, maximumApproximateMessages?: number): Promise<string>;

@@ -13,4 +13,4 @@ destroy(): Promise<boolean>;

readFrom: string;
subscribe(consumerName: string, handler: (payload: Payload[]) => Promise<boolean>, pollSpan?: number, payloadsToFetch?: number, subscriptionHandle?: string): Promise<string>;
unsubscribe(subscriptionHandle: string): Promise<string>;
subscribe(consumerName: string, handler: (payload: Payload[]) => Promise<boolean>, pollSpan?: number, payloadsToFetch?: number, subscriptionHandle?: string, readPending?: boolean): Promise<string>;
unsubscribe(subscriptionHandle: string): Promise<boolean>;
pendingSummary(): Promise<GroupSummary>;

@@ -23,3 +23,3 @@ }

payload: any;
markAsRead(dropMessage?: boolean): Promise<boolean>;
markAsRead(deleteMessage?: boolean): Promise<boolean>;
}

@@ -26,0 +26,0 @@

const shortid = require("shortid");
const scripto = require('redis-scripto');
const path = require('path');
class StreamChannelBroker {
#scriptingEngine;

@@ -23,12 +21,2 @@ constructor(redisClient, channelName) {

this._groupPendingSummary = this._destroyingCheckWrapper(this._groupPendingSummary.bind(this), false);
this.#scriptingEngine = new scripto(this._redisClient);
this.#scriptingEngine.loadFromDir(path.resolve(path.dirname(__filename), 'lua'));
this.#scriptingEngine.runLuaScriptAsync = async (scriptName, keys, args) => new Promise((resolve, reject) => this.#scriptingEngine.run(scriptName, keys, args, function (err, result) {
if (err != undefined) {
reject(err);
return;
}
resolve(result)
}));
this.#scriptingEngine.runLuaScriptAsync.bind(this.#scriptingEngine);
}

@@ -56,9 +44,13 @@

async _subscribe(groupName, consumerName, handler, pollSpan = 1000, payloadsToFetch = 2, subscriptionHandle = shortid.generate()) {
async _subscribe(groupName, consumerName, handler, pollSpan = 1000, payloadsToFetch = 2, subscriptionHandle = shortid.generate(), readPending = false) {
const intervalHandle = setTimeout(async () => {
try {
const messages = await this._redisClient.xreadgroup("GROUP", groupName, consumerName, "COUNT", payloadsToFetch, "STREAMS", this._channelName, ">");
const messages = await this._redisClient.xreadgroup("GROUP", groupName, consumerName, "COUNT", payloadsToFetch, "STREAMS", this._channelName, (readPending === false ? ">" : "0"));
if (messages !== null) {
let streamPayloads = this._transformResponseToMessage(messages, groupName);
await handler(streamPayloads);
if (streamPayloads.length === 0 & readPending === true) {// The server should respond back with zero and not with null response. //Look at usage example https://redis.io/commands/xreadgroup
//This means all pending messages are processed.
readPending = false;
}
}

@@ -68,3 +60,3 @@ }

if (this._destroying === false && this._unsubscribe(subscriptionHandle)) {
await this._subscribe(groupName, consumerName, handler, pollSpan, payloadsToFetch, subscriptionHandle);
await this._subscribe(groupName, consumerName, handler, pollSpan, payloadsToFetch, subscriptionHandle, readPending);
}

@@ -94,2 +86,3 @@ }

result = await this._redisClient.xack(this._channelName, groupName, messageId);
return result === 1;
} else {

@@ -100,4 +93,4 @@ result = await this._redisClient.multi()

.exec();
return result[0][1] === result[1][1] && result[0][1] === 1;
}
return result === 1;
}

@@ -134,3 +127,10 @@

async joinConsumerGroup(groupName, readFrom = '$') {
await this.#scriptingEngine.runLuaScriptAsync("create-group", [this._channelName], [groupName, readFrom]);
try {
await this._redisClient.xgroup("CREATE", this._channelName, groupName, readFrom, "MKSTREAM");
}
catch (err) {
if (!err.message === 'BUSYGROUP Consumer Group name already exists')
throw err;
}
return {

@@ -137,0 +137,0 @@ "name": groupName,

@@ -0,0 +0,0 @@ This is free and unencumbered software released into the public domain.

{
"name": "redis-streams-broker",
"version": "0.0.8",
"version": "0.0.9",
"description": "This package is a broker to redis stream data type, This package provides guaranteed message delivery feature with acknowledgement.",

@@ -8,3 +8,3 @@ "main": "index.js",

"mocha": "mocha",
"test": "npm run redisstart && npm run mocha && npm run redisstop",
"test": "npm run redisstart && npm run mocha && npm run redisstop && npm run redisstart && env REDISCLIENT=1 npm run mocha && npm run redisstop",
"redisstop": "docker stop streamz",

@@ -19,2 +19,3 @@ "redisstart": "npm run redisstop || node -v && docker run --name streamz -p 6379:6379 -itd --rm redis:latest"

"redis",
"stream",
"streams",

@@ -34,10 +35,9 @@ "message",

"dependencies": {
"@types/ioredis": "^4.17.6",
"ioredis": "^4.16.2",
"redis-scripto": "^0.1.3",
"shortid": "^2.2.15"
},
"devDependencies": {
"mocha": "^7.1.1"
"ioredis": "^4.26.0",
"mocha": "^7.1.1",
"redis": "^3.1.2"
}
}
# redis-streams-broker
This simple package is based on redis streams data type which provides you with following features
1. Centralized Que. (Using Redis)
2. Guarantee of message delivery via consumer acknowledgements.
3. Consumer Group functionality for scalability. (Just like Kafka)
This package is based on [redis stream](https://github.com/LRagji/redis-streams-broker) data type which provides you with following features
1. Broker to redis stream which can be used as centralized que between microservices. (Using Redis)
2. Support for injectable redis client (be it [ioredis](https://www.npmjs.com/package/ioredis) or [redis](https://www.npmjs.com/package/redis))
3. Guarantee of message delivery via consumer acknowledgements.
4. Consumer Group functionality for scalability. (Just like Kafka)
5. Option to drop a message when its acked, thus keeping memory footprint in check.

@@ -13,3 +15,3 @@ ## Getting Started

3. Run redis on local docker if required. `docker run --name streamz -p 6379:6379 -itd --rm redis:latest`
3. Instantiate with a redis connection and name for the stream. `const broker = new brokerType(redisConnectionString, name);`
3. Instantiate with a redis client and name for the stream. `const broker = new brokerType(redisClient, name);`
4. All done, Start using it!!.

@@ -19,7 +21,12 @@

1. Please find example code for injectable ioredis client [here](https://github.com/LRagji/redis-streams-broker/blob/master/examples/ioredis.js)
2. Please find example code for injectable custom client [here](https://github.com/LRagji/redis-streams-broker/blob/master/examples/custom.js)
```javascript
const Redis = require("ioredis");
const redisConnectionString = "redis://127.0.0.1:6379/";
const qName = "Queue";
const redisClient = new Redis(redisConnectionString);
const brokerType = require('redis-streams-broker').StreamChannelBroker;
const broker = new brokerType(redisConnectionString, qName);
const broker = new brokerType(redisClient, qName);

@@ -64,6 +71,4 @@ //Used to publish a paylod on stream.

1. Authors love for Open Source.
2. [IORedis](https://www.npmjs.com/package/ioredis).
3. [shortid](https://www.npmjs.com/package/shortid).
4. [redis-scripto](https://www.npmjs.com/package/redis-scripto).
1. Authors :heart for Open Source.
2. [shortid](https://www.npmjs.com/package/shortid).

@@ -76,3 +81,3 @@ ## Contributions

## Current Version:
0.0.8[Beta]
0.0.9[Beta]

@@ -82,1 +87,86 @@ ## License

This project is contrubution to public domain and completely free for use, view [LICENSE.md](/license.md) file for details.
## API
Class `StreamChannelBroker`
1. `constructor(redisClient: any, channelName: string)`
Creates a broker instance.
*redisClient*: Injectable redis client which will be used to send commands to redis server.
*channelName*: Name of the stream key, if this doesnot exists it will be created on first push or group subscription.
2. `publish(payload: any, maximumApproximateMessages?: number): Promise<string>;`
Publishes provided message into the stream and returns id generated by server.
*payload*: A JS object containing properties which are passed as key values pairs.
*maximumApproximateMessages*: Appropiate length of the stream it is equal to `~ MAXLENGTH` option in redis. Defaulted to 100.
3. `joinConsumerGroup(groupName: string, readFrom: string): Promise<ConsumerGroup>`
Creates a consumer group on the given redis stream with information provided, if the group exists does nothing returning a `ConsumerGroup` object.
*groupName*: Name of the group to be created ot joined.
*readFrom*: Id of the mesage to start reading from. defaulted to `$` to only read new messages recevied on redis, check [redis docs](https://redis.io/commands/xgroup) for more info.
4. `memoryFootprint(): Promise<number>`
Returns number of bytes consumed by the current stream.
5. `destroy(): Promise<boolean>;`
Starts to unsubscribe all the handles that were subscribed to this instance.
Class `ConsumerGroup`
1. `subscribe(consumerName: string, handler: (payload: Payload[]) => Promise<boolean>, pollSpan?: number, payloadsToFetch?: number, subscriptionHandle?: string, readPending?: boolean): Promise<string>`
Subscribes to stream to start receiving events when new payload arrives, this internally creates a polling system to check for new messages in stream. returns subscription name.
*consumerName*: Name of the consumer who is subscribing via the consumer group object.
*handler*: A callback function which will be invoked when new message a.k.a payload(s) arrive. Should be of signature `(payload: Payload[]) => Promise<boolean>` should be async return from this function is ignored for now, look at `Payload` class below for more details.
*pollSpan*: Number of millisecond to wait after completion of handler to check for next available message in stream. Defaulted to 1000 milliseconds.
*payloadsToFetch*: Maximum number of messages to fetch in one poll to server this is simillar to `COUNT` command in redis, this is optional and defaulted to 2.
*subscriptionHandle*: Name for subscription handler this is what will be returned from the function, this is defaulted to unique shortid.
*readPending*: If set to `true` will read all messages from start of the stream ie: Id = 0 which are in pending list of this consumer and group, once all pending are read it will automatically switch to latest messages from the stream. If set to `false` it will always look for new message from the stream, this is defaulted to `false`.
2. `unsubscribe(subscriptionHandle: string): Promise<boolean>`
Unsubscribes from the stream for the given subscriptionhandle, returns true for sucess and false for failure.
*subscriptionHandle*: Name of the subscription handle which was returned by subscribe api.
3. `pendingSummary(): Promise<GroupSummary>`
Returns details of the pending items for the given group by exposing `GroupSummary` object.
Class `Payload`
1. `channel: string`: Name of the stream key in redis.
2. `id: string`: Id of the message being received.
3. `payload: any`: Actual payload to processs.
4. `markAsRead(deleteMessage?: boolean): Promise<boolean>`
This function helps to ack the payload as read or processed, returns status of the operation via boolean return type `true` indicating success.
*deleteMessage*: if set to `true` it will ack & delete the message from the stream if set to `false` will only ack the message defaulted to false.
Class `GroupSummary`
1. `total: number`: This is the total number of messages in pending list.
2. `firstId: string`: Id of the first message which is pending.
3. `lastId: string`: Id of the last message which is pending.
4. `consumerStats: any`: Extra information provided by `XPENDING` command.
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc