New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@soundxyz/redis-pubsub

Package Overview
Dependencies
Maintainers
14
Versions
16
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@soundxyz/redis-pubsub

Full type-safe Redis PubSub with Zod

  • 4.1.2
  • latest
  • Source
  • npm
  • Socket score

Version published
Weekly downloads
288
decreased by-34.99%
Maintainers
14
Weekly downloads
 
Created
Source

redis-pubsub

Full type-safe Redis PubSub system with async iterators

Features

  • Type-safety with Zod
  • Out-of-the-box support for Date/Map/Set/BigInt serialization with superjson
  • Full usage of Async Iterators
  • Support for AbortController / AbortSignal
  • Support for type-safe filtering using Type Guards / Type predicates
  • Support for Entity + Identifier pattern subscriptions
  • Support for Async Zod Refines and Async Zod Transforms
  • GraphQL API ready

Install

pnpm add @soundxyz/redis-pubsub
npm install @soundxyz/redis-pubsub
yarn add @soundxyz/redis-pubsub

Peer dependencies

pnpm add zod ioredis
npm install zod ioredis
yarn add zod ioredis

Usage

Create a Redis PubSub instance:

import Redis from "ioredis";
import { z } from "zod";

import { RedisPubSub } from "@soundxyz/redis-pubsub";

const { createChannel } = RedisPubSub({
  publisher: new Redis({
    port: 6379,
  }),
  subscriber: new Redis({
    port: 6379,
  }),
});

Create a channel with any Zod schema and a unique "name" to be used as main trigger.

const schema = z.object({
  id: z.string(),
  name: z.string(),
});

const userChannel = createChannel({
  name: "User",
  schema,
});

const nonLazyUserChannel = createChannel({
  name: "User",
  schema,
  // By default the channels are lazily connected with redis
  isLazy: false,
});

Subscribe and publish to the channel

// Using async iterators / async generators to subscribe
(async () => {
  for await (const user of userChannel.subscribe()) {
    console.log("User", {
      id: user.id,
      name: user.name,
    });
  }
})();

// You can explicitly wait until the channel is sucessfully connected with Redis
await userChannel.isReady();

// Publish data into the channel
await userChannel.publish(
  {
    value: {
      id: "1",
      name: "John",
    },
  },
  // You can also publish more than a single value
  {
    value: {
      id: "2",
      name: "Peter",
    },
  }
);

Filter based on the data

(async () => {
  for await (const user of userChannel.subscribe({
    filter(value) {
      return value.id === "1";
    },
  })) {
    console.log("User 1", {
      id: user.id,
      name: user.name,
    });
  }
})();

// You can also use type predicates / type guards
(async () => {
  for await (const user of userChannel.subscribe({
    filter(value): value is { id: "1"; name: string } {
      return value.id === "1";
    },
  })) {
    // typeof user.id == "1"
    console.log("User 1", {
      id: user.id,
      name: user.name,
    });
  }
})();

Use custom identifiers

It will create a separate redis channel for every identifier, concatenating "name" and "identifier", for example, with "name"="User" and "identifier" = 1, the channel trigger name will be "User1"

(async () => {
  for await (const user of userChannel.subscribe({
    // number or string
    identifier: 1,
  })) {
    console.log("User with identifier=1", {
      id: user.id,
      name: user.name,
    });
  }
})();

await userChannel.isReady({
  // number or string
  identifier: 1,
});

await userChannel.publish({
  value: {
    id: "1",
    name: "John",
  },
  identifier: 1,
});

Separate input from output

You can levarage Zod Transforms to be able to separate input types from the output types, and receive any custom class or output on your subscriptions.

class CustomClass {
  constructor(public name: string) {}
}

const inputSchema = z.string();
const outputSchema = z.string().transform((input) => new CustomClass(input));

const channel = pubSub.createChannel({
  name: "separate-type",
  inputSchema,
  outputSchema,
});

const subscription = (async () => {
  for await (const data of channel.subscribe()) {
    return data;
  }
})();

await channel.isReady();

await channel.publish({
  value: "test",
});

const result = await subscription;

// true
console.log(result instanceof CustomClass);

// true
console.log(result.name === "test");

Use AbortController / AbortSignal

If isLazy is not disabled, the last subscription to a channel will be automatically unsubscribed from Redis.

const abortController = new AbortController();
const abortedSubscription = (() => {
  for await (const data of userChannel.subscribe({
    abortSignal: abortController.signal,
  })) {
    console.log({ data });
  }
})();

// ...

firstSubscribeAbortController.abort();

await abortedSubscription;

Unsubscribe specific identifiers

await userChannel.unsubscribe(
  {
    identifier: 1,
  },
  // You can specify more than a single identifer at once
  {
    identifier: 2,
  }
);

Unsubscribe an entire channel

await userChannel.unsubscribeAll();

Close the PubSub instance

const pubSub = RedisPubSub({
  publisher: new Redis({
    port: 6379,
  }),
  subscriber: new Redis({
    port: 6379,
  }),
});

// ...
await pubSub.close();

Keywords

FAQs

Package last updated on 21 Jul 2023

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc