
Security News
High Salaries No Longer Enough to Attract Top Cybersecurity Talent
A survey of 500 cybersecurity pros reveals high pay isn't enough—lack of growth and flexibility is driving attrition and risking organizational security.
@jetit/publisher
Advanced tools
`@jetit/publisher` is a robust and feature-rich library for implementing an event-driven architecture using Redis PUB/SUB and Redis Streams. It provides a scalable mechanism for publishing and consuming events in real-time, with support for advanced featu
@jetit/publisher
is a robust and feature-rich library for implementing an event-driven architecture using Redis PUB/SUB and Redis Streams. It provides a scalable mechanism for publishing and consuming events in real-time, with support for advanced features such as message deduplication, consumer group management, scheduled event publishing, and more.
npm install @jetit/publisher
import { Publisher, EventData } from '@jetit/publisher';
// Create an instance of the publisher
const publisher = new Publisher('MyService');
// Publish an event
const eventData: EventData<{ message: string }> = {
eventName: 'my-event',
data: { message: 'Hello, world!' }
};
await publisher.publish(eventData);
// Subscribe to an event
publisher.listen('my-event').subscribe(event => {
console.log(`Received event: ${event.eventName}`, event.data);
});
The Publisher
class can be configured with various options, including Circuit Breaker and Backpressure handling:
import { Publisher, IStreamsConfig } from '@jetit/publisher';
const config: Partial<IStreamsConfig> = {
cleanUpInterval: 3600000, // 1 hour
maxRetries: 5,
initialRetryDelay: 1000,
immediatePublishThreshold: 500,
unprocessedMessageThreshold: 25,
acknowledgedMessageCleanupInterval: 3600000, // 1 hour
dlqEventThreshold: 2000,
filterKeepAlive: 86400000, // 24 hours
duplicationCheckWindow: 86400, // 24 hours
circuitBreaker: {
enabled: true,
errorThreshold: 50,
errorThresholdPercentage: 50,
openStateDuration: 30000, // 30s
halfOpenStateMaxAttempts: 10,
maxStoredEvents: 5000,
},
};
const publisher = new Publisher('MyService', config);
const eventData = {
eventName: 'user-registered',
data: { userId: '123', email: 'user@example.com' }
};
await publisher.publish(eventData);
// Basic subscription with automatic acknowledgment
publisher.listen('user-registered').subscribe(event => {
console.log('New user registered:', event.data);
});
// Subscription with external acknowledgment
const options = {
externalAcknowledgement: true
};
publisher.listen('user-registered', options).subscribe(async event => {
try {
console.log('New user registered:', event.data);
// Process the event
await processUserRegistration(event.data);
// Manually acknowledge the message after successful processing
await publisher.acknowledgeMessage(event.ackKey);
} catch (error) {
// Handle error - message will not be acknowledged and will be reprocessed
console.error('Failed to process user registration:', error);
}
});
The externalAcknowledgement
option allows you to manually control when messages are acknowledged. This is useful when:
When externalAcknowledgement
is set to true
:
ackKey
that must be used to acknowledge itacknowledgeMessage(event.ackKey)
after successful processingNote: Be careful with external acknowledgment as failing to acknowledge messages can lead to message redelivery and potential duplicate processing.
const futureDate = new Date(Date.now() + 60000); // 1 minute from now
await publisher.scheduledPublish(futureDate, eventData);
import { publishBatch } from '@jetit/publisher';
const events = [
{ eventName: 'event1', data: { /* ... */ } },
{ eventName: 'event2', data: { /* ... */ } },
// ...
];
const result = await publishBatch(publisher, events, { batchSize: 100, delayBetweenBatches: 1000 });
console.log('Batch publish result:', result);
// Retry an event from DLQ
const success = await publisher.retryFromDLQ('eventId');
// Get DLQ stats
const stats = await publisher.getDLQStats();
console.log('DLQ stats:', stats);
const options = {
eventFilter: (event) => event.data.userId === '123',
filterKeepAlive: 3600000 // 1 hour
};
publisher.listen('user-action', options).subscribe(event => {
console.log('Filtered user action:', event);
});
// Get metrics for a specific time range
const metrics = await publisher.getMetrics(startTime, endTime);
console.log('Performance metrics:', metrics);
// Get latest metrics
const latestMetrics = await publisher.getLatestMetrics();
console.log('Latest metrics:', latestMetrics);
import { PrometheusAdapter } from '@jetit/publisher';
import promClient from 'prom-client';
import express from 'express';
const app = express();
const prometheusAdapter = new PrometheusAdapter(publisher, promClient);
prometheusAdapter.setupEndpoint(app, '/metrics');
app.listen(3000, () => {
console.log('Metrics server listening on port 3000');
});
The library supports content-based deduplication to ensure that each unique event is processed only once:
const options = {
publishOnceGuarantee: true
};
publisher.listen('important-event', options).subscribe(event => {
console.log('Guaranteed unique event:', event);
});
You can subscribe to multiple events from the same service:
const subscription1 = publisher.listen('event1').subscribe(/* ... */);
const subscription2 = publisher.listen('event2').subscribe(/* ... */);
The Circuit Breaker pattern is implemented to prevent cascading failures in a distributed system. It helps to gracefully handle failures and allows the system to recover without overwhelming failed services.
Configuration options:
enabled
: Enable or disable the Circuit Breaker.errorThreshold
: Number of errors before opening the circuit.errorThresholdPercentage
: Percentage of errors to total calls before opening the circuit.timeWindow
: Time window for error rate calculation (in milliseconds).openStateDuration
: Duration to keep the circuit open before moving to half-open state (in milliseconds).halfOpenStateMaxAttempts
: Maximum number of attempts allowed in half-open state.The Circuit Breaker has three states:
xdel
operations for improved cleanup performanceTo ensure proper cleanup of resources, implement a graceful shutdown:
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
async function shutdown() {
console.log('Graceful shutdown initiated.');
try {
await publisher.close();
console.log('Resources and connections successfully closed.');
} catch (error) {
console.error('Error during graceful shutdown:', error);
}
process.exit(0);
}
If you encounter issues:
FAQs
`@jetit/publisher` is a robust and feature-rich library for implementing an event-driven architecture using Redis PUB/SUB and Redis Streams. It provides a scalable mechanism for publishing and consuming events in real-time, with support for advanced featu
The npm package @jetit/publisher receives a total of 1,456 weekly downloads. As such, @jetit/publisher popularity was classified as popular.
We found that @jetit/publisher demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
A survey of 500 cybersecurity pros reveals high pay isn't enough—lack of growth and flexibility is driving attrition and risking organizational security.
Product
Socket, the leader in open source security, is now available on Google Cloud Marketplace for simplified procurement and enhanced protection against supply chain attacks.
Security News
Corepack will be phased out from future Node.js releases following a TSC vote.