Relief Valve
This package is based on redis stream data type and provides you with following features
- It is agnostic to any redis client library can be used with redis-abstraction.
- This package acts like a pressure relief value used in plumbing; this pattern is used to batch multiple messages in the stream into a single group and deliver it to consumer, used case for data ingestions and other places where batching is needed.
- It can be used as simple que without batching, for sharding messages within multiple consumers for H-Scalling
- It provides guarantee for message delivery and or processing with acknowledge facility.
- Provides facility to reprocess lost messages(which are not acked over a period of time).
- Can used as in fan-out topology(each consumer receives the copy of the message).
- Batching of messages with respect to count of messages.
- Batching of messages with respect to time elapsed from the last write to the stream.
- Its Redis cluster compatible package.
- It provides accumalator sharding functionality
Getting Started
- Install using
npm -i relief-valve
- Require in your project.
const rvType = require('relief-valve').ReliefValve;
or import { IBatchIdentity, IRedisClient, ReliefValve } from 'relief-valve'
- Run redis on local docker if required.
docker run --name streamz -p 6379:6379 -itd --rm redis:latest
- Instantiate with a redis client and name for the stream, group name, client name and thresholds.
const publisherInstance = new ReliefValve(client, name, 1, 1, "PubGroup", "Publisher1");
- All done, Start using it!!.
Examples/Code snippets
- Please find example code usage in component tests
async function main(redisPool)
const batchsize = 10;
const publisherInstance = new ReliefValve(redisPool, name, batchsize, 1, "PubGroup", "Publisher1");
const consumerInstance1 = new ReliefValve(redisPool, name, batchsize, 1, "ShardGroup1", "Consumer1");
let payloads = new Map<string, object>();
for (let counter = 0; counter < 100; counter++) {
const payload = { "hello": "world1", "A": "1", "Z": "26", "B": "2", "counter": counter.toString() };
const generatedId = await publisherInstance.publish(payload);
payloads.set(generatedId, payload);
const consumer1Result = await consumerInstance1.consumeFreshOrStale(3600);
assert.notStrictEqual(generatedId, undefined);
assert.notStrictEqual(generatedId, null);
assert.notStrictEqual(generatedId, "");
if (payloads.size === batchsize) {
if (consumer1Result == undefined) throw new Error("Read failed no batch found");
assert.notStrictEqual(, undefined);
assert.notStrictEqual(, null);
assert.notStrictEqual(, "");
assert.strictEqual(consumer1Result.readsInCurrentGroup, 1);
assert.strictEqual(consumer1Result.payload.size, batchsize);
assert.deepStrictEqual(consumer1Result.payload, payloads);
const ackResult = await consumerInstance1.acknowledge(consumer1Result as IBatchIdentity);
assert.deepStrictEqual(ackResult, true);
payloads = new Map<string, object>();
else {
assert.deepStrictEqual(consumer1Result, undefined);
const token = redisPool.generateUniqueToken('Test');
try {
await redisPool.acquire(token);
const keys = await, ["KEYS", "*"]);
const length = await, ["XLEN", name]);
assert.deepStrictEqual(keys, [name]);
assert.deepStrictEqual(length, 0);
finally {
await redisPool.release(token);
Built with
- Authors :heart: for Open Source.
- New ideas/techniques are welcomed.
- Raise a Pull Request.
This project is contrubution to public domain and completely free for use, view file for details.
Quick Tips & Usage Patterns
- The package should be instantiated on publisher and subscriber side with identitcal parameters in constructor apart from groupName and clientName, else will lead to chaotic behaviour of pulling messages.
- Count threshold is always evlauted on writes/publish into stream.
- Time threshold needs external invocation as redis currently doesnot support cron jobs, either subscriber or publisher can invoke this validation.
- Highest accurary of time threshold is limited to one second, but depends heavily on external invocation frequency.
- Typing info included with the package.
- Type doc[W.I.P]
Example impementation of Redis Pool
A pooled implmentation using redis-abstraction
import { IORedisClientPool } from 'redis-abstraction';
const singleNodeRedisConnectionString = 'rediss://';
const connectionInjector = () => IORedisClientPool.IORedisClientClusterFactory([singleNodeRedisConnectionString]);
const pool = new IORedisClientPool(connectionInjector);
.finally(async () => {
await pool.shutdown()