Socket
Socket
Sign inDemoInstall

redis-x-stream

Package Overview
Dependencies
11
Maintainers
1
Versions
28
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

    redis-x-stream

An async iterable interface for redis streams


Version published
Weekly downloads
35K
increased by30.17%
Maintainers
1
Install size
130 kB
Created
Weekly downloads
 

Readme

Source

redis-x-stream

Create async iterables that emit redis stream entries. Requires Redis 5 or greater.

release license

test

Getting Started

import { RedisStream } from 'redis-x-stream'
import Redis from 'ioredis'

const myStream = 'my-stream'
await populate(myStream, 1e5)

let i = 0
for await (const [streamName, [id, keyvals]] of new RedisStream(myStream)) {
  i++;
}
console.log(`read ${i} stream entries from ${myStream}`)

async function populate(stream, count) {
  const writer = new Redis({ enableAutoPipelining: true })
  await Promise.all(
    Array.from(Array(count), (_, j) => writer.xadd(stream, '*', 'index', j))
  )
  writer.quit()
  await new Promise(resolve => writer.once('close', resolve))
  console.log(`wrote ${count} stream entries to ${stream}`)
}

Usage

See the API Docs for available options.

Advanced Usage

Task Processing

If you have a cluster of processes reading redis stream entries you likely want to utilize redis consumer groups

A task processing application may look like the following:

const control = {
  /* some control event emitter */
}
const stream = new RedisStream({
  streams: ['my-stream'],
  group: 'my-group',
  //eg. k8s StatefulSet hostname. or Cloud Foundry instance index
  consumer: 'tpc_' + process.env.SOME_ORDINAL_IDENTIFIER,
  block: Infinity,
  count: 10,
  deleteOnAck: true,
})
const lock = new Semaphore(11)
const release = lock.release.bind(lock)

control.on('new-source', (streamName) => {
  //Add an additional source stream to a blocked stream.
  stream.addStream(streamName)
})
control.on('shutdown', async () => {
  //drain will process all claimed entries (the PEL) and stop iteration
  await stream.drain()
})

async function tryTask(stream, streamName, id, entry) {
  //...process entry...
  stream.ack(streamName, id)
}

for await (const [streamName, [id, keyvals]] of stream) {
  await lock.acquire()
  void tryTask(stream, streamName, id, keyvals).finally(release)
}

Keywords

FAQs

Last updated on 09 Jan 2023

Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc