redis-streams-broker
Advanced tools
Comparing version 0.0.10 to 0.0.11
export declare class StreamChannelBroker { | ||
constructor(redisClient: any, channelName: string); | ||
publish(payload: any, maximumApproximateMessages?: number): Promise<string>; | ||
publish(payload: any, maximumApproximateMessages?: number, failOnMaxMessageCount: boolean): Promise<string>; | ||
destroy(): Promise<boolean>; | ||
@@ -13,3 +13,3 @@ joinConsumerGroup(groupName: string, readFrom: string): Promise<ConsumerGroup>; | ||
readFrom: string; | ||
subscribe(consumerName: string, handler: (payload: Payload[]) => Promise<boolean>, pollSpan?: number, payloadsToFetch?: number, subscriptionHandle?: string, readPending?: boolean): Promise<string>; | ||
subscribe(consumerName: string, handler: (payload: Payload[]) => Promise<number>, pollSpan?: number, payloadsToFetch?: number, subscriptionHandle?: string, readPending?: boolean): Promise<string>; | ||
unsubscribe(subscriptionHandle: string): Promise<boolean>; | ||
@@ -16,0 +16,0 @@ pendingSummary(): Promise<GroupSummary>; |
38
index.js
const shortid = require("shortid"); | ||
const path = require('path'); | ||
const Scripto = require("redis-scripto"); | ||
class StreamChannelBroker { | ||
constructor(redisClient, channelName) { | ||
constructor(redisClient, channelName, scriptManager = new Scripto(redisClient)) { | ||
this._scriptManager = scriptManager; | ||
this._redisClient = redisClient; | ||
@@ -21,2 +22,3 @@ this._destroying = false; | ||
this._groupPendingSummary = this._destroyingCheckWrapper(this._groupPendingSummary.bind(this), false); | ||
this._scriptManager.loadFromDir('./scripts'); | ||
} | ||
@@ -47,10 +49,14 @@ | ||
try { | ||
const messages = await this._redisClient.xreadgroup("GROUP", groupName, consumerName, "COUNT", payloadsToFetch, "BLOCK", pollSpan, "STREAMS", this._channelName, (readPending === false ? ">" : "0")); | ||
const messages = await this._redisClient.xreadgroup("GROUP", groupName, consumerName, "BLOCK", pollSpan, "COUNT", payloadsToFetch, "STREAMS", this._channelName, (readPending === false ? ">" : "0")); | ||
if (messages !== null) { | ||
let streamPayloads = this._transformResponseToMessage(messages, groupName); | ||
await handler(streamPayloads); | ||
if (streamPayloads.length === 0 & readPending === true) {// The server should respond back with zero and not with null response. //Look at usage example https://redis.io/commands/xreadgroup | ||
//This means all pending messages are processed. | ||
//This means all pending messages are processed for this consumer name. | ||
readPending = false; | ||
} | ||
let nextPayloadToFetch = await handler(streamPayloads); | ||
if (nextPayloadToFetch != null && !Number.isNaN(nextPayloadToFetch) && nextPayloadToFetch != "") { | ||
payloadsToFetch = Number.parseInt(nextPayloadToFetch); | ||
} | ||
} | ||
@@ -60,3 +66,5 @@ } | ||
if (this._destroying === false && this._unsubscribe(subscriptionHandle)) { | ||
await this._subscribe(groupName, consumerName, handler, pollSpan, payloadsToFetch, subscriptionHandle, readPending); | ||
if (payloadsToFetch > 0) { | ||
await this._subscribe(groupName, consumerName, handler, pollSpan, payloadsToFetch, subscriptionHandle, readPending); | ||
} | ||
} | ||
@@ -150,3 +158,3 @@ } | ||
async publish(payload, maximumApproximateMessages = 100) { | ||
async publish(payload, maximumApproximateMessages = 100, failOnMaxMessageCount = false) { | ||
let keyValuePairs = []; | ||
@@ -176,2 +184,15 @@ const payloadType = typeof payload; | ||
} | ||
else if (maximumApproximateMessages > 0 && failOnMaxMessageCount === true) { | ||
return new Promise((acc, rej) => { | ||
this._scriptManager.run('addWithLimit', [this._channelName], [maximumApproximateMessages, ...keyValuePairs], (err, result) => { | ||
if (err !== null) { | ||
return rej(err); | ||
} | ||
if (result == null) { | ||
return rej(new Error(`Maximum length exceeded!!, limit(${maximumApproximateMessages})`)); | ||
} | ||
acc(result); | ||
}); | ||
}); | ||
} | ||
else { | ||
@@ -194,2 +215,3 @@ return await this._redisClient.xadd(this._channelName, 'MAXLEN', '~', maximumApproximateMessages, '*', ...keyValuePairs); | ||
exports.StreamChannelBroker = StreamChannelBroker; | ||
exports.StreamChannelBroker = StreamChannelBroker; | ||
exports.AsyncProcessor = require('./asyncProcessor'); |
{ | ||
"name": "redis-streams-broker", | ||
"version": "0.0.10", | ||
"version": "0.0.11", | ||
"description": "This package is a broker to redis stream data type, This package provides guaranteed message delivery feature with acknowledgement.", | ||
@@ -34,2 +34,3 @@ "main": "index.js", | ||
"dependencies": { | ||
"redis-scripto": "^0.1.3", | ||
"shortid": "^2.2.15" | ||
@@ -36,0 +37,0 @@ }, |
@@ -23,2 +23,3 @@ # redis-streams-broker | ||
3. Please find multi threading examples [here](https://github.com/LRagji/redis-streams-broker/tree/master/examples/H-Scalling%20idempotent%20operation) | ||
4. Please find async processing examples [here](https://github.com/LRagji/redis-streams-broker/tree/master/examples/Stream%20Processing) | ||
@@ -72,3 +73,4 @@ ```javascript | ||
1. Authors :heart for Open Source. | ||
2. [shortid](https://www.npmjs.com/package/shortid). | ||
2. [shortid](https://www.npmjs.com/package/shortid) for auto generating subscribtion handles. | ||
3. [redis-scripto](https://www.npmjs.com/package/redis-scripto) for handling lua scripts. | ||
@@ -99,3 +101,3 @@ ## Contributions | ||
2. `publish(payload: any, maximumApproximateMessages?: number): Promise<string>;` | ||
2. `publish(payload: any, maximumApproximateMessages?: number, failOnMaxMessageCount:boolean): Promise<string>;` | ||
@@ -108,2 +110,4 @@ Publishes provided message into the stream and returns id generated by server. | ||
*failOnMaxMessageCount*: if *maximumApproximateMessages* and *failOnMaxMessageCount* is set to true then it will only publish messages untill it reaches the maximum count post that it will start throwing an error, default value is false. | ||
3. `joinConsumerGroup(groupName: string, readFrom: string): Promise<ConsumerGroup>` | ||
@@ -134,3 +138,3 @@ | ||
*handler*: A callback function which will be invoked when new message a.k.a payload(s) arrive. Should be of signature `(payload: Payload[]) => Promise<boolean>` should be async return from this function is ignored for now, look at `Payload` class below for more details. | ||
*handler*: A callback function which will be invoked when new message a.k.a payload(s) arrive. Should be of signature `(payload: Payload[]) => Promise<number>` should be async & return from this function is number of messages to fetch from redis(expected +ve number; -ve or 0 will unsubscribe from the group stopping all further reads from stream,if NAN then defaults to number provided when subscribing), look at `Payload` class below for more details. | ||
@@ -137,0 +141,0 @@ *pollSpan*: Number of millisecond to wait after completion of handler to check for next available message in stream. Defaulted to 1000 milliseconds. |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
25227
7
291
173
2
+ Addedredis-scripto@^0.1.3
+ Addeddebug@0.7.4(transitive)
+ Addedredis@0.8.6(transitive)
+ Addedredis-scripto@0.1.3(transitive)