@jetit/publisher
@jetit/publisher
is a robust and feature-rich library for implementing an event-driven architecture using Redis PUB/SUB and Redis Streams. It provides a scalable mechanism for publishing and consuming events in real-time, with support for advanced features such as message deduplication, consumer group management, scheduled event publishing, and more.
Table of Contents
Installation
npm install @jetit/publisher
Key Features
- Real-time event publishing and subscribing
- Configurable Streams class for flexible usage
- Improved error handling and reliability
- Performance tracking with Redis time and operation time metrics
- Dead Letter Queue (DLQ) for handling subscription failures
- Event filtering for specialized subscriptions
- Support for multiple event subscriptions from the same service
- Batch publishing (regular and scheduled)
- Basic monitoring with Prometheus export support
- Content-based one-time guarantee (0-1 semantics support)
- Optimized cleanup processes for improved performance
- Circuit Breaker pattern for fault tolerance
Usage
Basic Example
import { Publisher, EventData } from '@jetit/publisher';
const publisher = new Publisher('MyService');
const eventData: EventData<{ message: string }> = {
eventName: 'my-event',
data: { message: 'Hello, world!' }
};
await publisher.publish(eventData);
publisher.listen('my-event').subscribe(event => {
console.log(`Received event: ${event.eventName}`, event.data);
});
Configuration
The Publisher
class can be configured with various options, including Circuit Breaker and Backpressure handling:
import { Publisher, IStreamsConfig } from '@jetit/publisher';
const config: Partial<IStreamsConfig> = {
cleanUpInterval: 3600000,
maxRetries: 5,
initialRetryDelay: 1000,
immediatePublishThreshold: 500,
unprocessedMessageThreshold: 25,
acknowledgedMessageCleanupInterval: 3600000,
dlqEventThreshold: 2000,
filterKeepAlive: 86400000,
duplicationCheckWindow: 86400,
circuitBreaker: {
enabled: true,
errorThreshold: 50,
errorThresholdPercentage: 50,
openStateDuration: 30000,
halfOpenStateMaxAttempts: 10,
maxStoredEvents: 5000,
},
};
const publisher = new Publisher('MyService', config);
Publishing Events
const eventData = {
eventName: 'user-registered',
data: { userId: '123', email: 'user@example.com' }
};
await publisher.publish(eventData);
Subscribing to Events
publisher.listen('user-registered').subscribe(event => {
console.log('New user registered:', event.data);
});
const options = {
externalAcknowledgement: true
};
publisher.listen('user-registered', options).subscribe(async event => {
try {
console.log('New user registered:', event.data);
await processUserRegistration(event.data);
await publisher.acknowledgeMessage(event.ackKey);
} catch (error) {
console.error('Failed to process user registration:', error);
}
});
The externalAcknowledgement
option allows you to manually control when messages are acknowledged. This is useful when:
- You need to ensure message processing is complete before acknowledgment
- You want to implement custom retry logic
- You need to coordinate acknowledgment with other operations
- You want to implement transaction-like behavior
When externalAcknowledgement
is set to true
:
- Messages won't be automatically acknowledged after delivery
- Each message contains an
ackKey
that must be used to acknowledge it
- Unacknowledged messages will be redelivered to other consumers
- You must explicitly call
acknowledgeMessage(event.ackKey)
after successful processing
Note: Be careful with external acknowledgment as failing to acknowledge messages can lead to message redelivery and potential duplicate processing.
Scheduled Publishing
const futureDate = new Date(Date.now() + 60000);
await publisher.scheduledPublish(futureDate, eventData);
Batch Publishing
import { publishBatch } from '@jetit/publisher';
const events = [
{ eventName: 'event1', data: { } },
{ eventName: 'event2', data: { } },
];
const result = await publishBatch(publisher, events, { batchSize: 100, delayBetweenBatches: 1000 });
console.log('Batch publish result:', result);
Dead Letter Queue (DLQ)
const success = await publisher.retryFromDLQ('eventId');
const stats = await publisher.getDLQStats();
console.log('DLQ stats:', stats);
Event Filtering
const options = {
eventFilter: (event) => event.data.userId === '123',
filterKeepAlive: 3600000
};
publisher.listen('user-action', options).subscribe(event => {
console.log('Filtered user action:', event);
});
Performance Monitoring
const metrics = await publisher.getMetrics(startTime, endTime);
console.log('Performance metrics:', metrics);
const latestMetrics = await publisher.getLatestMetrics();
console.log('Latest metrics:', latestMetrics);
Prometheus Integration
import { PrometheusAdapter } from '@jetit/publisher';
import promClient from 'prom-client';
import express from 'express';
const app = express();
const prometheusAdapter = new PrometheusAdapter(publisher, promClient);
prometheusAdapter.setupEndpoint(app, '/metrics');
app.listen(3000, () => {
console.log('Metrics server listening on port 3000');
});
Advanced Features
Content-Based Deduplication
The library supports content-based deduplication to ensure that each unique event is processed only once:
const options = {
publishOnceGuarantee: true
};
publisher.listen('important-event', options).subscribe(event => {
console.log('Guaranteed unique event:', event);
});
Multiple Event Subscriptions
You can subscribe to multiple events from the same service:
const subscription1 = publisher.listen('event1').subscribe();
const subscription2 = publisher.listen('event2').subscribe();
Circuit Breaker
The Circuit Breaker pattern is implemented to prevent cascading failures in a distributed system. It helps to gracefully handle failures and allows the system to recover without overwhelming failed services.
Configuration options:
enabled
: Enable or disable the Circuit Breaker.
errorThreshold
: Number of errors before opening the circuit.
errorThresholdPercentage
: Percentage of errors to total calls before opening the circuit.
timeWindow
: Time window for error rate calculation (in milliseconds).
openStateDuration
: Duration to keep the circuit open before moving to half-open state (in milliseconds).
halfOpenStateMaxAttempts
: Maximum number of attempts allowed in half-open state.
The Circuit Breaker has three states:
- Closed: Normal operation, calls pass through.
- Open: Calls are immediately rejected without reaching the service.
- Half-Open: A limited number of calls are allowed to test if the service has recovered.
Performance Optimizations
- Batched
xdel
operations for improved cleanup performance
- Configurable cleanup intervals and thresholds
- Efficient event filtering at the subscription level
- Retry logic with exponential backoff for failed operations
- Circuit Breaker to prevent overwhelming failed services
- Dead Letter Queue (DLQ) for handling subscription failures
Cleanup and Graceful Shutdown
To ensure proper cleanup of resources, implement a graceful shutdown:
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
async function shutdown() {
console.log('Graceful shutdown initiated.');
try {
await publisher.close();
console.log('Resources and connections successfully closed.');
} catch (error) {
console.error('Error during graceful shutdown:', error);
}
process.exit(0);
}
Troubleshooting
If you encounter issues:
- Check the Redis connection settings
- Verify that consumer groups are correctly created
- Monitor the DLQ for failed events
- Review the performance metrics for any anomalies
- Check the logs for detailed error messages