Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

relief-valve

Package Overview
Dependencies
Maintainers
1
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

relief-valve

This is a simple library for Redis Streams data type, which is used to accumulate messages until a specified threshold is reached, post which the same is available to consumer stream.

  • 0.0.1
  • Source
  • npm
  • Socket score

Version published
Weekly downloads
95
increased by90%
Maintainers
1
Weekly downloads
 
Created
Source

Relief Valve

This package is based on redis stream data type and provides you with following features

  1. It is agnostic to any redis client library can be used with ioredis or redis or any of your favourite redis client implementation as long as it satisfies IRedisClient Interface.
  2. This package acts like a pressure relief value used in plumbing; this pattern is used to batch multiple messages in the stream into a single group and deliver it to consumer, used case for data ingestions and other places where batching is needed.
  3. It can be used as simple que without batching, for sharding messages within multiple consumers for H-Scalling
  4. It provides guarantee for message delivery and or processing with acknowledge facility.
  5. Provides facility to reprocess lost messages(which are not acked over a period of time).
  6. Can used as in fan-out topology(each consumer receives the copy of the message).
  7. Batching of messages with respect to count of messages.
  8. Batching of messages with respect to time elapsed from the last write to the stream.
  9. Its Redis cluster compatible.

Getting Started

  1. Install using npm -i relief-valve
  2. Require in your project. const rvType = require('relief-valve').ReliefValve; or import { IBatchIdentity, IRedisClient, ReliefValve } from 'relief-valve'
  3. Run redis on local docker if required. docker run --name streamz -p 6379:6379 -itd --rm redis:latest
  4. Instantiate with a redis client and name for the stream, group name, client name and thresholds. const publisherInstance = new ReliefValve(client, name, 1, 1, "PubGroup", "Publisher1");
  5. All done, Start using it!!.

Examples/Code snippets

  1. Please find example code usage in component tests
  2. Please find example code implementaion of ioredis client here
//Count based batching test
 const batchsize = 10;
const publisherInstance = new ReliefValve(client, name, batchsize, 1, "PubGroup", "Publisher1");
const consumerInstance1 = new ReliefValve(client, name, batchsize, 1, "ShardGroup1", "Consumer1");

let payloads = new Map<string, object>();
for (let counter = 0; counter < 100; counter++) {

    const payload = { "hello": "world1", "A": "1", "Z": "26", "B": "2", "counter": counter.toString() };
    const generatedId = await publisherInstance.publish(payload);
    payloads.set(generatedId, payload);

    //Test
    const consumer1Result = await consumerInstance1.consumeFreshOrStale(3600);

    //Verify
    assert.notStrictEqual(generatedId, undefined);
    assert.notStrictEqual(generatedId, null);
    assert.notStrictEqual(generatedId, "");
    if (payloads.size === batchsize) {
        if (consumer1Result == undefined) throw new Error("Read failed no batch found");
        assert.notStrictEqual(consumer1Result.id, undefined);
        assert.notStrictEqual(consumer1Result.id, null);
        assert.notStrictEqual(consumer1Result.id, "");
        assert.notStrictEqual(consumer1Result.name, undefined);
        assert.notStrictEqual(consumer1Result.name, null);
        assert.notStrictEqual(consumer1Result.name, "");
        assert.strictEqual(consumer1Result.readsInCurrentGroup, 1);
        assert.strictEqual(consumer1Result.payload.size, batchsize);
        assert.deepStrictEqual(consumer1Result.payload, payloads);
        const ackResult = await consumerInstance1.acknowledge(consumer1Result as IBatchIdentity);
        assert.deepStrictEqual(ackResult, true);
        payloads = new Map<string, object>();
    }
    else {
        assert.deepStrictEqual(consumer1Result, undefined);
    }
}
const keys = await client.run(["KEYS", "*"]);
const length = await client.run(["XLEN", name]);
assert.deepStrictEqual(keys, [name]);
assert.deepStrictEqual(length, 0);

Built with

  1. Authors :heart: for Open Source.

Contributions

  1. New ideas/techniques are welcomed.
  2. Raise a Pull Request.

Current Version:

0.0.1[Beta]

License

This project is contrubution to public domain and completely free for use, view LICENSE.md file for details.

Quick Tips & Usage Patterns:

  1. The package should be instantiated on publisher and subscriber side with identitcal parameters in constructor apart from groupName and clientName, else will lead to chaotic behaviour of pulling messages.
  2. Count threshold is always evlauted on writes/publish into stream.
  3. Time threshold needs external invocation as redis currently doesnot support cron jobs, either subscriber or publisher can invoke this validation.
  4. Highest accurary of time threshold is limited to one second, but depends heavily on external invocation frequency.

API

  1. Typing info included with the package.
  2. Type doc[W.I.P]

Keywords

FAQs

Package last updated on 03 May 2022

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc