redis-streams-broker
Advanced tools
Comparing version 0.0.7 to 0.0.8
@@ -1,2 +0,2 @@ | ||
import redisNs from '@types/ioredis' | ||
import redisNs from 'ioredis' | ||
export declare class StreamChannelBroker { | ||
@@ -7,3 +7,3 @@ constructor(redisClient: redisNs.Redis, channelName: string); | ||
joinConsumerGroup(groupName: string, readFrom: string): Promise<ConsumerGroup>; | ||
memoryFootprint(): Promise<integer>; | ||
memoryFootprint(): Promise<number>; | ||
} | ||
@@ -10,0 +10,0 @@ |
17
index.js
@@ -57,10 +57,13 @@ const shortid = require("shortid"); | ||
const intervalHandle = setTimeout(async () => { | ||
const messages = await this._redisClient.xreadgroup("GROUP", groupName, consumerName, "COUNT", payloadsToFetch, "STREAMS", this._channelName, ">"); | ||
if (messages !== null) { | ||
await this._unsubscribe(subscriptionHandle); | ||
let streamPayloads = this._transformResponseToMessage(messages, groupName); | ||
await handler(streamPayloads); | ||
try { | ||
const messages = await this._redisClient.xreadgroup("GROUP", groupName, consumerName, "COUNT", payloadsToFetch, "STREAMS", this._channelName, ">"); | ||
if (messages !== null) { | ||
let streamPayloads = this._transformResponseToMessage(messages, groupName); | ||
await handler(streamPayloads); | ||
} | ||
} | ||
if (this._destroying === false) { | ||
await this._subscribe(groupName, consumerName, handler, pollSpan, payloadsToFetch, subscriptionHandle); | ||
finally { | ||
if (this._destroying === false && this._unsubscribe(subscriptionHandle)) { | ||
await this._subscribe(groupName, consumerName, handler, pollSpan, payloadsToFetch, subscriptionHandle); | ||
} | ||
} | ||
@@ -67,0 +70,0 @@ }, pollSpan); |
@@ -0,0 +0,0 @@ This is free and unencumbered software released into the public domain. |
{ | ||
"name": "redis-streams-broker", | ||
"version": "0.0.7", | ||
"version": "0.0.8", | ||
"description": "This package is a broker to redis stream data type, This package provides guaranteed message delivery feature with acknowledgement.", | ||
"main": "index.js", | ||
"scripts": { | ||
"mocha": "./node_modules/mocha/bin/mocha", | ||
"mocha": "mocha", | ||
"test": "npm run redisstart && npm run mocha && npm run redisstop", | ||
"redisstop": "docker stop streamz", | ||
"redisstart": "npm run redisstop || true && docker run --name streamz -p 6379:6379 -itd --rm redis:latest" | ||
"redisstart": "npm run redisstop || node -v && docker run --name streamz -p 6379:6379 -itd --rm redis:latest" | ||
}, | ||
@@ -12,0 +12,0 @@ "repository": { |
@@ -73,3 +73,3 @@ # redis-streams-broker | ||
## Current Version: | ||
0.0.7[Beta] | ||
0.0.8[Beta] | ||
@@ -76,0 +76,0 @@ ## License |
Sorry, the diff of this file is not supported yet
15278
6
190