Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

redis-streams-broker

Package Overview
Dependencies
Maintainers
1
Versions
15
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

redis-streams-broker - npm Package Compare versions

Comparing version 0.0.10 to 0.0.11

asyncProcessor.js

4

index.d.ts
export declare class StreamChannelBroker {
constructor(redisClient: any, channelName: string);
publish(payload: any, maximumApproximateMessages?: number): Promise<string>;
publish(payload: any, maximumApproximateMessages?: number, failOnMaxMessageCount: boolean): Promise<string>;
destroy(): Promise<boolean>;

@@ -13,3 +13,3 @@ joinConsumerGroup(groupName: string, readFrom: string): Promise<ConsumerGroup>;

readFrom: string;
subscribe(consumerName: string, handler: (payload: Payload[]) => Promise<boolean>, pollSpan?: number, payloadsToFetch?: number, subscriptionHandle?: string, readPending?: boolean): Promise<string>;
subscribe(consumerName: string, handler: (payload: Payload[]) => Promise<number>, pollSpan?: number, payloadsToFetch?: number, subscriptionHandle?: string, readPending?: boolean): Promise<string>;
unsubscribe(subscriptionHandle: string): Promise<boolean>;

@@ -16,0 +16,0 @@ pendingSummary(): Promise<GroupSummary>;

const shortid = require("shortid");
const path = require('path');
const Scripto = require("redis-scripto");
class StreamChannelBroker {
constructor(redisClient, channelName) {
constructor(redisClient, channelName, scriptManager = new Scripto(redisClient)) {
this._scriptManager = scriptManager;
this._redisClient = redisClient;

@@ -21,2 +22,3 @@ this._destroying = false;

this._groupPendingSummary = this._destroyingCheckWrapper(this._groupPendingSummary.bind(this), false);
this._scriptManager.loadFromDir('./scripts');
}

@@ -47,10 +49,14 @@

try {
const messages = await this._redisClient.xreadgroup("GROUP", groupName, consumerName, "COUNT", payloadsToFetch, "BLOCK", pollSpan, "STREAMS", this._channelName, (readPending === false ? ">" : "0"));
const messages = await this._redisClient.xreadgroup("GROUP", groupName, consumerName, "BLOCK", pollSpan, "COUNT", payloadsToFetch, "STREAMS", this._channelName, (readPending === false ? ">" : "0"));
if (messages !== null) {
let streamPayloads = this._transformResponseToMessage(messages, groupName);
await handler(streamPayloads);
if (streamPayloads.length === 0 & readPending === true) {// The server should respond back with zero and not with null response. //Look at usage example https://redis.io/commands/xreadgroup
//This means all pending messages are processed.
//This means all pending messages are processed for this consumer name.
readPending = false;
}
let nextPayloadToFetch = await handler(streamPayloads);
if (nextPayloadToFetch != null && !Number.isNaN(nextPayloadToFetch) && nextPayloadToFetch != "") {
payloadsToFetch = Number.parseInt(nextPayloadToFetch);
}
}

@@ -60,3 +66,5 @@ }

if (this._destroying === false && this._unsubscribe(subscriptionHandle)) {
await this._subscribe(groupName, consumerName, handler, pollSpan, payloadsToFetch, subscriptionHandle, readPending);
if (payloadsToFetch > 0) {
await this._subscribe(groupName, consumerName, handler, pollSpan, payloadsToFetch, subscriptionHandle, readPending);
}
}

@@ -150,3 +158,3 @@ }

async publish(payload, maximumApproximateMessages = 100) {
async publish(payload, maximumApproximateMessages = 100, failOnMaxMessageCount = false) {
let keyValuePairs = [];

@@ -176,2 +184,15 @@ const payloadType = typeof payload;

}
else if (maximumApproximateMessages > 0 && failOnMaxMessageCount === true) {
return new Promise((acc, rej) => {
this._scriptManager.run('addWithLimit', [this._channelName], [maximumApproximateMessages, ...keyValuePairs], (err, result) => {
if (err !== null) {
return rej(err);
}
if (result == null) {
return rej(new Error(`Maximum length exceeded!!, limit(${maximumApproximateMessages})`));
}
acc(result);
});
});
}
else {

@@ -194,2 +215,3 @@ return await this._redisClient.xadd(this._channelName, 'MAXLEN', '~', maximumApproximateMessages, '*', ...keyValuePairs);

exports.StreamChannelBroker = StreamChannelBroker;
exports.StreamChannelBroker = StreamChannelBroker;
exports.AsyncProcessor = require('./asyncProcessor');
{
"name": "redis-streams-broker",
"version": "0.0.10",
"version": "0.0.11",
"description": "This package is a broker to redis stream data type, This package provides guaranteed message delivery feature with acknowledgement.",

@@ -34,2 +34,3 @@ "main": "index.js",

"dependencies": {
"redis-scripto": "^0.1.3",
"shortid": "^2.2.15"

@@ -36,0 +37,0 @@ },

@@ -23,2 +23,3 @@ # redis-streams-broker

3. Please find multi threading examples [here](https://github.com/LRagji/redis-streams-broker/tree/master/examples/H-Scalling%20idempotent%20operation)
4. Please find async processing examples [here](https://github.com/LRagji/redis-streams-broker/tree/master/examples/Stream%20Processing)

@@ -72,3 +73,4 @@ ```javascript

1. Authors :heart for Open Source.
2. [shortid](https://www.npmjs.com/package/shortid).
2. [shortid](https://www.npmjs.com/package/shortid) for auto generating subscribtion handles.
3. [redis-scripto](https://www.npmjs.com/package/redis-scripto) for handling lua scripts.

@@ -99,3 +101,3 @@ ## Contributions

2. `publish(payload: any, maximumApproximateMessages?: number): Promise<string>;`
2. `publish(payload: any, maximumApproximateMessages?: number, failOnMaxMessageCount:boolean): Promise<string>;`

@@ -108,2 +110,4 @@ Publishes provided message into the stream and returns id generated by server.

*failOnMaxMessageCount*: if *maximumApproximateMessages* and *failOnMaxMessageCount* is set to true then it will only publish messages untill it reaches the maximum count post that it will start throwing an error, default value is false.
3. `joinConsumerGroup(groupName: string, readFrom: string): Promise<ConsumerGroup>`

@@ -134,3 +138,3 @@

*handler*: A callback function which will be invoked when new message a.k.a payload(s) arrive. Should be of signature `(payload: Payload[]) => Promise<boolean>` should be async return from this function is ignored for now, look at `Payload` class below for more details.
*handler*: A callback function which will be invoked when new message a.k.a payload(s) arrive. Should be of signature `(payload: Payload[]) => Promise<number>` should be async & return from this function is number of messages to fetch from redis(expected +ve number; -ve or 0 will unsubscribe from the group stopping all further reads from stream,if NAN then defaults to number provided when subscribing), look at `Payload` class below for more details.

@@ -137,0 +141,0 @@ *pollSpan*: Number of millisecond to wait after completion of handler to check for next available message in stream. Defaulted to 1000 milliseconds.

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc