@bernierllc/message-queue
Atomic message queuing utilities for message ordering and persistence.
Features
- Priority-based Message Ordering: Messages are processed in priority order (URGENT > HIGH > NORMAL > LOW)
- Configurable Retry Logic: Automatic retry with exponential backoff for failed message processing
- Message Expiration: TTL (Time To Live) support for automatic message cleanup
- Event System: Real-time events for queue state changes
- Message Filtering: Filter messages by priority, timestamp, metadata, and more
- Queue Statistics: Comprehensive metrics and monitoring
- Queue Management: Pause, resume, and clear queue operations
- TypeScript Support: Full type safety with comprehensive interfaces
Installation
npm install @bernierllc/message-queue
Dependencies:
This package depends on @bernierllc/retry-policy
for exponential backoff calculations. It will be installed automatically.
Quick Start
import { MessageQueue, MessagePriority } from '@bernierllc/message-queue';
const queue = new MessageQueue({
name: 'my-queue',
maxSize: 1000,
enableRetries: true,
maxRetries: 3
});
queue.addMessage('Hello World', MessagePriority.NORMAL);
queue.addMessage('Urgent task', MessagePriority.URGENT);
const processor = async (message) => {
console.log(`Processing: ${message.content}`);
return true;
};
await queue.processMessage(processor);
API Reference
MessageQueue
The main class for managing message queues.
Constructor
new MessageQueue(config: QueueConfig, options?: QueueOptions)
QueueConfig:
name: string
- Queue name (required)
maxSize?: number
- Maximum number of messages in queue
defaultPriority?: MessagePriority
- Default priority for messages
defaultTtl?: number
- Default time-to-live in milliseconds
enableRetries?: boolean
- Enable automatic retry on failure
maxRetries?: number
- Maximum number of retry attempts
retryDelay?: number
- Base delay between retries in milliseconds (default: 1000)
retryMaxDelay?: number
- Maximum delay between retries in milliseconds (default: 10x retryDelay)
retryJitter?: boolean
- Whether to add jitter to retry delays (default: true)
QueueOptions:
enableEvents?: boolean
- Enable event system (default: true)
enableStats?: boolean
- Enable statistics tracking (default: true)
enablePersistence?: boolean
- Enable persistence (default: false)
Methods
addMessage(content, priority?, metadata?)
Add a message to the queue.
const result = queue.addMessage('message content', MessagePriority.HIGH, { userId: '123' });
getNextMessage()
Get the next message from the queue (removes it from queue).
const message = queue.getNextMessage();
processMessage(processor)
Process a message with the provided processor function.
const processor = async (message: Message) => {
return true;
};
const result = await queue.processMessage(processor);
getMessages(filter?)
Get messages matching the filter criteria.
const messages = queue.getMessages({
priority: MessagePriority.HIGH,
fromTimestamp: new Date('2023-01-01'),
limit: 10,
offset: 0
});
removeMessage(messageId)
Remove a specific message from the queue.
const result = queue.removeMessage('msg_1234567890_abc123');
clear()
Clear all messages from the queue.
const result = queue.clear();
getStats()
Get queue statistics.
const stats = queue.getStats();
getInfo()
Get comprehensive queue information.
const info = queue.getInfo();
onEvent(handler)
Add an event handler.
queue.onEvent((event: QueueEvent) => {
console.log(`Event: ${event.type} - ${event.messageId}`);
});
offEvent(handler)
Remove an event handler.
queue.offEvent(handler);
pause()
Pause the queue (no new messages can be added).
queue.pause();
resume()
Resume the queue.
queue.resume();
Properties
size: number
- Current number of messages in queue
isEmpty: boolean
- Whether queue is empty
isFull: boolean
- Whether queue is at maximum capacity
Types
MessagePriority
enum MessagePriority {
LOW = 0,
NORMAL = 1,
HIGH = 2,
URGENT = 3
}
Message
interface Message {
id: string;
content: any;
priority: MessagePriority;
timestamp: Date;
expiresAt?: Date;
metadata?: Record<string, any>;
retryCount?: number;
maxRetries?: number;
}
QueueEvent
interface QueueEvent {
type: 'message_added' | 'message_processed' | 'message_failed' | 'queue_full' | 'queue_empty';
messageId?: string;
timestamp: Date;
data?: any;
}
MessageFilter
interface MessageFilter {
priority?: MessagePriority;
fromTimestamp?: Date;
toTimestamp?: Date;
metadata?: Record<string, any>;
limit?: number;
offset?: number;
}
Examples
Basic Usage
import { MessageQueue, MessagePriority } from '@bernierllc/message-queue';
const queue = new MessageQueue({
name: 'email-queue',
maxSize: 1000,
enableRetries: true,
maxRetries: 3
});
queue.addMessage({ to: 'user@example.com', subject: 'Welcome' }, MessagePriority.NORMAL);
queue.addMessage({ to: 'admin@example.com', subject: 'Alert' }, MessagePriority.HIGH);
const emailProcessor = async (message) => {
console.log(`Sending email: ${message.content.subject}`);
return true;
};
while (!queue.isEmpty) {
await queue.processMessage(emailProcessor);
}
Event-Driven Processing
const queue = new MessageQueue({ name: 'event-queue' });
queue.onEvent((event) => {
switch (event.type) {
case 'message_added':
console.log(`Message added: ${event.messageId}`);
break;
case 'message_processed':
console.log(`Message processed: ${event.messageId}`);
break;
case 'queue_full':
console.log('Queue is full!');
break;
}
});
queue.addMessage('Event message 1');
queue.addMessage('Event message 2');
Message Filtering
const queue = new MessageQueue({ name: 'filter-queue' });
queue.addMessage('User notification', MessagePriority.HIGH, {
type: 'notification',
userId: '123'
});
queue.addMessage('System log', MessagePriority.LOW, {
type: 'log',
system: 'auth'
});
const notifications = queue.getMessages({
metadata: { type: 'notification' }
});
const highPriority = queue.getMessages({
priority: MessagePriority.HIGH
});
const paginated = queue.getMessages({
limit: 10,
offset: 0
});
Retry Logic with Exponential Backoff
const queue = new MessageQueue({
name: 'retry-queue',
enableRetries: true,
maxRetries: 3,
retryDelay: 1000,
retryMaxDelay: 10000,
retryJitter: true
});
queue.addMessage('Failing task');
const processor = async (message) => {
if (Math.random() < 0.7) {
throw new Error('Processing failed');
}
return true;
};
await queue.processMessage(processor);
Error Handling
The queue provides comprehensive error handling:
const result = queue.addMessage('test');
if (!result.success) {
console.error('Failed to add message:', result.error);
}
const processResult = await queue.processMessage(processor);
if (!processResult.success) {
console.error('Failed to process message:', processResult.error);
}
Performance Considerations
- Memory Usage: Messages are stored in memory by default. For large queues, consider implementing persistence.
- Processing Speed: The queue processes messages sequentially. For high-throughput scenarios, consider using multiple queues or workers.
- Event Handlers: Keep event handlers lightweight to avoid blocking queue operations.
License
This file is licensed to the client under a limited-use license.
The client may use and modify this code only within the scope of the project it was delivered for.
Redistribution or use in other products or commercial offerings is not permitted without written consent from Bernier LLC.