redis-pubsub
Full type-safe Redis PubSub system with async iterators
Features
Install
pnpm add @soundxyz/redis-pubsub
npm install @soundxyz/redis-pubsub
yarn add @soundxyz/redis-pubsub
Peer dependencies
pnpm add zod ioredis
npm install zod ioredis
yarn add zod ioredis
Usage
Create a Redis PubSub instance:
import Redis from "ioredis";
import { z } from "zod";
import { RedisPubSub } from "@soundxyz/redis-pubsub";
const { createChannel } = RedisPubSub({
publisher: new Redis({
port: 6379,
}),
subscriber: new Redis({
port: 6379,
}),
});
Create a channel with any Zod
schema and a unique "name"
to be used as main trigger.
const schema = z.object({
id: z.string(),
name: z.string(),
});
const userChannel = createChannel({
name: "User",
schema,
});
const nonLazyUserChannel = createChannel({
name: "User",
schema,
isLazy: false,
});
Subscribe and publish to the channel
(async () => {
for await (const user of userChannel.subscribe()) {
console.log("User", {
id: user.id,
name: user.name,
});
}
})();
await userChannel.isReady();
await userChannel.publish(
{
value: {
id: "1",
name: "John",
},
},
{
value: {
id: "2",
name: "Peter",
},
}
);
Filter based on the data
(async () => {
for await (const user of userChannel.subscribe({
filter(value) {
return value.id === "1";
},
})) {
console.log("User 1", {
id: user.id,
name: user.name,
});
}
})();
(async () => {
for await (const user of userChannel.subscribe({
filter(value): value is { id: "1"; name: string } {
return value.id === "1";
},
})) {
console.log("User 1", {
id: user.id,
name: user.name,
});
}
})();
Use custom identifiers
It will create a separate redis channel for every identifier, concatenating "name"
and "identifier"
, for example, with "name"
="User"
and "identifier"
= 1
, the channel trigger name will be "User1"
(async () => {
for await (const user of userChannel.subscribe({
identifier: 1,
})) {
console.log("User with identifier=1", {
id: user.id,
name: user.name,
});
}
})();
await userChannel.isReady({
identifier: 1,
});
await userChannel.publish({
value: {
id: "1",
name: "John",
},
identifier: 1,
});
Separate input from output
You can levarage Zod Transforms to be able to separate input types from the output types, and receive any custom class or output on your subscriptions.
class CustomClass {
constructor(public name: string) {}
}
const inputSchema = z.string();
const outputSchema = z.string().transform((input) => new CustomClass(input));
const channel = pubSub.createChannel({
name: "separate-type",
inputSchema,
outputSchema,
});
const subscription = (async () => {
for await (const data of channel.subscribe()) {
return data;
}
})();
await channel.isReady();
await channel.publish({
value: "test",
});
const result = await subscription;
console.log(result instanceof CustomClass);
console.log(result.name === "test");
Use AbortController / AbortSignal
If isLazy
is not disabled, the last subscription to a channel will be automatically unsubscribed from Redis.
const abortController = new AbortController();
const abortedSubscription = (() => {
for await (const data of userChannel.subscribe({
abortSignal: abortController.signal,
})) {
console.log({ data });
}
})();
firstSubscribeAbortController.abort();
await abortedSubscription;
Unsubscribe specific identifiers
await userChannel.unsubscribe(
{
identifier: 1,
},
{
identifier: 2,
}
);
Unsubscribe an entire channel
await userChannel.unsubscribeAll();
Close the PubSub instance
const pubSub = RedisPubSub({
publisher: new Redis({
port: 6379,
}),
subscriber: new Redis({
port: 6379,
}),
});
await pubSub.close();