New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@jetit/publisher

Package Overview
Dependencies
Maintainers
2
Versions
49
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@jetit/publisher

`@jetit/publisher` is a robust and feature-rich library for implementing an event-driven architecture using Redis PUB/SUB and Redis Streams. It provides a scalable mechanism for publishing and consuming events in real-time, with support for advanced featu

5.5.1
latest
npm
Version published
Weekly downloads
1.6K
42.47%
Maintainers
2
Weekly downloads
 
Created
Source

@jetit/publisher

@jetit/publisher is a robust and feature-rich library for implementing an event-driven architecture using Redis PUB/SUB and Redis Streams. It provides a scalable mechanism for publishing and consuming events in real-time, with support for advanced features such as message deduplication, consumer group management, scheduled event publishing, and more.

Table of Contents

Installation

npm install @jetit/publisher

Key Features

  • Real-time event publishing and subscribing
  • Configurable Streams class for flexible usage
  • Improved error handling and reliability
  • Performance tracking with Redis time and operation time metrics
  • Dead Letter Queue (DLQ) for handling subscription failures
  • Event filtering for specialized subscriptions
  • Support for multiple event subscriptions from the same service
  • Batch publishing (regular and scheduled)
  • Basic monitoring with Prometheus export support
  • Content-based one-time guarantee (0-1 semantics support)
  • Optimized cleanup processes for improved performance
  • Circuit Breaker pattern for fault tolerance

Usage

Basic Example

import { Publisher, EventData } from '@jetit/publisher';

// Create an instance of the publisher
const publisher = new Publisher('MyService');

// Publish an event
const eventData: EventData<{ message: string }> = {
    eventName: 'my-event',
    data: { message: 'Hello, world!' }
};

await publisher.publish(eventData);

// Subscribe to an event
publisher.listen('my-event').subscribe(event => {
    console.log(`Received event: ${event.eventName}`, event.data);
});

Configuration

The Publisher class can be configured with various options, including Circuit Breaker and Backpressure handling:

import { Publisher, IStreamsConfig } from '@jetit/publisher';

const config: Partial<IStreamsConfig> = {
    cleanUpInterval: 3600000, // 1 hour
    maxRetries: 5,
    initialRetryDelay: 1000,
    immediatePublishThreshold: 500,
    unprocessedMessageThreshold: 25,
    acknowledgedMessageCleanupInterval: 3600000, // 1 hour
    dlqEventThreshold: 2000,
    filterKeepAlive: 86400000, // 24 hours
    duplicationCheckWindow: 86400, // 24 hours
    circuitBreaker: {
        enabled: true,
        errorThreshold: 50,
        errorThresholdPercentage: 50,
        openStateDuration: 30000, // 30s
        halfOpenStateMaxAttempts: 10,
        maxStoredEvents: 5000,
    },
};

const publisher = new Publisher('MyService', config);

Publishing Events

const eventData = {
    eventName: 'user-registered',
    data: { userId: '123', email: 'user@example.com' }
};

await publisher.publish(eventData);

Subscribing to Events

// Basic subscription with automatic acknowledgment
publisher.listen('user-registered').subscribe(event => {
    console.log('New user registered:', event.data);
});

// Subscription with external acknowledgment
const options = {
    externalAcknowledgement: true
};

publisher.listen('user-registered', options).subscribe(async event => {
    try {
        console.log('New user registered:', event.data);
        // Process the event
        await processUserRegistration(event.data);
        
        // Manually acknowledge the message after successful processing
        await publisher.acknowledgeMessage(event.ackKey);
    } catch (error) {
        // Handle error - message will not be acknowledged and will be reprocessed
        console.error('Failed to process user registration:', error);
    }
});

The externalAcknowledgement option allows you to manually control when messages are acknowledged. This is useful when:

  • You need to ensure message processing is complete before acknowledgment
  • You want to implement custom retry logic
  • You need to coordinate acknowledgment with other operations
  • You want to implement transaction-like behavior

When externalAcknowledgement is set to true:

  • Messages won't be automatically acknowledged after delivery
  • Each message contains an ackKey that must be used to acknowledge it
  • Unacknowledged messages will be redelivered to other consumers
  • You must explicitly call acknowledgeMessage(event.ackKey) after successful processing

Note: Be careful with external acknowledgment as failing to acknowledge messages can lead to message redelivery and potential duplicate processing.

Scheduled Publishing

const futureDate = new Date(Date.now() + 60000); // 1 minute from now
await publisher.scheduledPublish(futureDate, eventData);

Batch Publishing

import { publishBatch } from '@jetit/publisher';

const events = [
    { eventName: 'event1', data: { /* ... */ } },
    { eventName: 'event2', data: { /* ... */ } },
    // ...
];

const result = await publishBatch(publisher, events, { batchSize: 100, delayBetweenBatches: 1000 });
console.log('Batch publish result:', result);

Dead Letter Queue (DLQ)

// Retry an event from DLQ
const success = await publisher.retryFromDLQ('eventId');

// Get DLQ stats
const stats = await publisher.getDLQStats();
console.log('DLQ stats:', stats);

Event Filtering

const options = {
    eventFilter: (event) => event.data.userId === '123',
    filterKeepAlive: 3600000 // 1 hour
};

publisher.listen('user-action', options).subscribe(event => {
    console.log('Filtered user action:', event);
});

Performance Monitoring

// Get metrics for a specific time range
const metrics = await publisher.getMetrics(startTime, endTime);
console.log('Performance metrics:', metrics);

// Get latest metrics
const latestMetrics = await publisher.getLatestMetrics();
console.log('Latest metrics:', latestMetrics);

Prometheus Integration

import { PrometheusAdapter } from '@jetit/publisher';
import promClient from 'prom-client';
import express from 'express';

const app = express();
const prometheusAdapter = new PrometheusAdapter(publisher, promClient);

prometheusAdapter.setupEndpoint(app, '/metrics');

app.listen(3000, () => {
    console.log('Metrics server listening on port 3000');
});

Advanced Features

Content-Based Deduplication

The library supports content-based deduplication to ensure that each unique event is processed only once:

const options = {
    publishOnceGuarantee: true
};

publisher.listen('important-event', options).subscribe(event => {
    console.log('Guaranteed unique event:', event);
});

Multiple Event Subscriptions

You can subscribe to multiple events from the same service:

const subscription1 = publisher.listen('event1').subscribe(/* ... */);
const subscription2 = publisher.listen('event2').subscribe(/* ... */);

Circuit Breaker

The Circuit Breaker pattern is implemented to prevent cascading failures in a distributed system. It helps to gracefully handle failures and allows the system to recover without overwhelming failed services.

Configuration options:

  • enabled: Enable or disable the Circuit Breaker.
  • errorThreshold: Number of errors before opening the circuit.
  • errorThresholdPercentage: Percentage of errors to total calls before opening the circuit.
  • timeWindow: Time window for error rate calculation (in milliseconds).
  • openStateDuration: Duration to keep the circuit open before moving to half-open state (in milliseconds).
  • halfOpenStateMaxAttempts: Maximum number of attempts allowed in half-open state.

The Circuit Breaker has three states:

  • Closed: Normal operation, calls pass through.
  • Open: Calls are immediately rejected without reaching the service.
  • Half-Open: A limited number of calls are allowed to test if the service has recovered.

Performance Optimizations

  • Batched xdel operations for improved cleanup performance
  • Configurable cleanup intervals and thresholds
  • Efficient event filtering at the subscription level
  • Retry logic with exponential backoff for failed operations
  • Circuit Breaker to prevent overwhelming failed services
  • Dead Letter Queue (DLQ) for handling subscription failures

Cleanup and Graceful Shutdown

To ensure proper cleanup of resources, implement a graceful shutdown:

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

async function shutdown() {
    console.log('Graceful shutdown initiated.');
    try {
        await publisher.close();
        console.log('Resources and connections successfully closed.');
    } catch (error) {
        console.error('Error during graceful shutdown:', error);
    }
    process.exit(0);
}

Troubleshooting

If you encounter issues:

  • Check the Redis connection settings
  • Verify that consumer groups are correctly created
  • Monitor the DLQ for failed events
  • Review the performance metrics for any anomalies
  • Check the logs for detailed error messages

FAQs

Package last updated on 30 Jan 2025

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts