
Security News
Crates.io Users Targeted by Phishing Emails
The Rust Security Response WG is warning of phishing emails from rustfoundation.dev targeting crates.io users.
@bernierllc/message-queue
Advanced tools
Atomic message queuing utilities for message ordering and persistence
Atomic message queuing utilities for message ordering and persistence.
npm install @bernierllc/message-queue
Dependencies:
This package depends on @bernierllc/retry-policy
for exponential backoff calculations. It will be installed automatically.
import { MessageQueue, MessagePriority } from '@bernierllc/message-queue';
// Create a queue
const queue = new MessageQueue({
name: 'my-queue',
maxSize: 1000,
enableRetries: true,
maxRetries: 3
});
// Add messages
queue.addMessage('Hello World', MessagePriority.NORMAL);
queue.addMessage('Urgent task', MessagePriority.URGENT);
// Process messages
const processor = async (message) => {
console.log(`Processing: ${message.content}`);
// Do some work...
return true; // Success
};
await queue.processMessage(processor);
The main class for managing message queues.
new MessageQueue(config: QueueConfig, options?: QueueOptions)
QueueConfig:
name: string
- Queue name (required)maxSize?: number
- Maximum number of messages in queuedefaultPriority?: MessagePriority
- Default priority for messagesdefaultTtl?: number
- Default time-to-live in millisecondsenableRetries?: boolean
- Enable automatic retry on failuremaxRetries?: number
- Maximum number of retry attemptsretryDelay?: number
- Base delay between retries in milliseconds (default: 1000)retryMaxDelay?: number
- Maximum delay between retries in milliseconds (default: 10x retryDelay)retryJitter?: boolean
- Whether to add jitter to retry delays (default: true)QueueOptions:
enableEvents?: boolean
- Enable event system (default: true)enableStats?: boolean
- Enable statistics tracking (default: true)enablePersistence?: boolean
- Enable persistence (default: false)Add a message to the queue.
const result = queue.addMessage('message content', MessagePriority.HIGH, { userId: '123' });
// Returns: { success: boolean, messageId?: string, error?: string, stats?: QueueStats }
Get the next message from the queue (removes it from queue).
const message = queue.getNextMessage();
// Returns: Message | null
Process a message with the provided processor function.
const processor = async (message: Message) => {
// Process the message
return true; // Return true for success, false for failure
};
const result = await queue.processMessage(processor);
// Returns: { success: boolean, error?: string, stats?: QueueStats }
Get messages matching the filter criteria.
const messages = queue.getMessages({
priority: MessagePriority.HIGH,
fromTimestamp: new Date('2023-01-01'),
limit: 10,
offset: 0
});
Remove a specific message from the queue.
const result = queue.removeMessage('msg_1234567890_abc123');
// Returns: { success: boolean, error?: string, stats?: QueueStats }
Clear all messages from the queue.
const result = queue.clear();
// Returns: { success: boolean, stats?: QueueStats }
Get queue statistics.
const stats = queue.getStats();
// Returns: QueueStats
Get comprehensive queue information.
const info = queue.getInfo();
// Returns: QueueInfo
Add an event handler.
queue.onEvent((event: QueueEvent) => {
console.log(`Event: ${event.type} - ${event.messageId}`);
});
Remove an event handler.
queue.offEvent(handler);
Pause the queue (no new messages can be added).
queue.pause();
Resume the queue.
queue.resume();
size: number
- Current number of messages in queueisEmpty: boolean
- Whether queue is emptyisFull: boolean
- Whether queue is at maximum capacityenum MessagePriority {
LOW = 0,
NORMAL = 1,
HIGH = 2,
URGENT = 3
}
interface Message {
id: string;
content: any;
priority: MessagePriority;
timestamp: Date;
expiresAt?: Date;
metadata?: Record<string, any>;
retryCount?: number;
maxRetries?: number;
}
interface QueueEvent {
type: 'message_added' | 'message_processed' | 'message_failed' | 'queue_full' | 'queue_empty';
messageId?: string;
timestamp: Date;
data?: any;
}
interface MessageFilter {
priority?: MessagePriority;
fromTimestamp?: Date;
toTimestamp?: Date;
metadata?: Record<string, any>;
limit?: number;
offset?: number;
}
import { MessageQueue, MessagePriority } from '@bernierllc/message-queue';
const queue = new MessageQueue({
name: 'email-queue',
maxSize: 1000,
enableRetries: true,
maxRetries: 3
});
// Add messages
queue.addMessage({ to: 'user@example.com', subject: 'Welcome' }, MessagePriority.NORMAL);
queue.addMessage({ to: 'admin@example.com', subject: 'Alert' }, MessagePriority.HIGH);
// Process messages
const emailProcessor = async (message) => {
console.log(`Sending email: ${message.content.subject}`);
// Send email logic here
return true;
};
while (!queue.isEmpty) {
await queue.processMessage(emailProcessor);
}
const queue = new MessageQueue({ name: 'event-queue' });
// Set up event handlers
queue.onEvent((event) => {
switch (event.type) {
case 'message_added':
console.log(`Message added: ${event.messageId}`);
break;
case 'message_processed':
console.log(`Message processed: ${event.messageId}`);
break;
case 'queue_full':
console.log('Queue is full!');
break;
}
});
// Add messages
queue.addMessage('Event message 1');
queue.addMessage('Event message 2');
const queue = new MessageQueue({ name: 'filter-queue' });
// Add messages with metadata
queue.addMessage('User notification', MessagePriority.HIGH, {
type: 'notification',
userId: '123'
});
queue.addMessage('System log', MessagePriority.LOW, {
type: 'log',
system: 'auth'
});
// Filter by metadata
const notifications = queue.getMessages({
metadata: { type: 'notification' }
});
// Filter by priority
const highPriority = queue.getMessages({
priority: MessagePriority.HIGH
});
// Filter with pagination
const paginated = queue.getMessages({
limit: 10,
offset: 0
});
const queue = new MessageQueue({
name: 'retry-queue',
enableRetries: true,
maxRetries: 3,
retryDelay: 1000, // Base delay: 1 second
retryMaxDelay: 10000, // Max delay: 10 seconds
retryJitter: true // Add jitter to prevent thundering herd
});
queue.addMessage('Failing task');
const processor = async (message) => {
// Simulate unreliable processing
if (Math.random() < 0.7) {
throw new Error('Processing failed');
}
return true;
};
await queue.processMessage(processor);
// Message will be retried with exponential backoff:
// Attempt 1: ~1 second delay
// Attempt 2: ~2 seconds delay
// Attempt 3: ~4 seconds delay
// Uses @bernierllc/retry-policy for backoff calculation
The queue provides comprehensive error handling:
const result = queue.addMessage('test');
if (!result.success) {
console.error('Failed to add message:', result.error);
}
const processResult = await queue.processMessage(processor);
if (!processResult.success) {
console.error('Failed to process message:', processResult.error);
}
This file is licensed to the client under a limited-use license. The client may use and modify this code only within the scope of the project it was delivered for. Redistribution or use in other products or commercial offerings is not permitted without written consent from Bernier LLC.
FAQs
Atomic message queuing utilities for message ordering and persistence
We found that @bernierllc/message-queue demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
The Rust Security Response WG is warning of phishing emails from rustfoundation.dev targeting crates.io users.
Product
Socket now lets you customize pull request alert headers, helping security teams share clear guidance right in PRs to speed reviews and reduce back-and-forth.
Product
Socket's Rust support is moving to Beta: all users can scan Cargo projects and generate SBOMs, including Cargo.toml-only crates, with Rust-aware supply chain checks.