Socket
Book a DemoInstallSign in
Socket

@bernierllc/message-queue

Package Overview
Dependencies
Maintainers
2
Versions
5
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@bernierllc/message-queue

Atomic message queuing utilities for message ordering and persistence

0.2.6
latest
Source
npmnpm
Version published
Maintainers
2
Created
Source

@bernierllc/message-queue

Atomic message queuing utilities for message ordering and persistence.

Features

  • Priority-based Message Ordering: Messages are processed in priority order (URGENT > HIGH > NORMAL > LOW)
  • Configurable Retry Logic: Automatic retry with exponential backoff for failed message processing
  • Message Expiration: TTL (Time To Live) support for automatic message cleanup
  • Event System: Real-time events for queue state changes
  • Message Filtering: Filter messages by priority, timestamp, metadata, and more
  • Queue Statistics: Comprehensive metrics and monitoring
  • Queue Management: Pause, resume, and clear queue operations
  • TypeScript Support: Full type safety with comprehensive interfaces

Installation

npm install @bernierllc/message-queue

Dependencies: This package depends on @bernierllc/retry-policy for exponential backoff calculations. It will be installed automatically.

Quick Start

import { MessageQueue, MessagePriority } from '@bernierllc/message-queue';

// Create a queue
const queue = new MessageQueue({
  name: 'my-queue',
  maxSize: 1000,
  enableRetries: true,
  maxRetries: 3
});

// Add messages
queue.addMessage('Hello World', MessagePriority.NORMAL);
queue.addMessage('Urgent task', MessagePriority.URGENT);

// Process messages
const processor = async (message) => {
  console.log(`Processing: ${message.content}`);
  // Do some work...
  return true; // Success
};

await queue.processMessage(processor);

API Reference

MessageQueue

The main class for managing message queues.

Constructor

new MessageQueue(config: QueueConfig, options?: QueueOptions)

QueueConfig:

  • name: string - Queue name (required)
  • maxSize?: number - Maximum number of messages in queue
  • defaultPriority?: MessagePriority - Default priority for messages
  • defaultTtl?: number - Default time-to-live in milliseconds
  • enableRetries?: boolean - Enable automatic retry on failure
  • maxRetries?: number - Maximum number of retry attempts
  • retryDelay?: number - Base delay between retries in milliseconds (default: 1000)
  • retryMaxDelay?: number - Maximum delay between retries in milliseconds (default: 10x retryDelay)
  • retryJitter?: boolean - Whether to add jitter to retry delays (default: true)

QueueOptions:

  • enableEvents?: boolean - Enable event system (default: true)
  • enableStats?: boolean - Enable statistics tracking (default: true)
  • enablePersistence?: boolean - Enable persistence (default: false)

Methods

addMessage(content, priority?, metadata?)

Add a message to the queue.

const result = queue.addMessage('message content', MessagePriority.HIGH, { userId: '123' });
// Returns: { success: boolean, messageId?: string, error?: string, stats?: QueueStats }
getNextMessage()

Get the next message from the queue (removes it from queue).

const message = queue.getNextMessage();
// Returns: Message | null
processMessage(processor)

Process a message with the provided processor function.

const processor = async (message: Message) => {
  // Process the message
  return true; // Return true for success, false for failure
};

const result = await queue.processMessage(processor);
// Returns: { success: boolean, error?: string, stats?: QueueStats }
getMessages(filter?)

Get messages matching the filter criteria.

const messages = queue.getMessages({
  priority: MessagePriority.HIGH,
  fromTimestamp: new Date('2023-01-01'),
  limit: 10,
  offset: 0
});
removeMessage(messageId)

Remove a specific message from the queue.

const result = queue.removeMessage('msg_1234567890_abc123');
// Returns: { success: boolean, error?: string, stats?: QueueStats }
clear()

Clear all messages from the queue.

const result = queue.clear();
// Returns: { success: boolean, stats?: QueueStats }
getStats()

Get queue statistics.

const stats = queue.getStats();
// Returns: QueueStats
getInfo()

Get comprehensive queue information.

const info = queue.getInfo();
// Returns: QueueInfo
onEvent(handler)

Add an event handler.

queue.onEvent((event: QueueEvent) => {
  console.log(`Event: ${event.type} - ${event.messageId}`);
});
offEvent(handler)

Remove an event handler.

queue.offEvent(handler);
pause()

Pause the queue (no new messages can be added).

queue.pause();
resume()

Resume the queue.

queue.resume();

Properties

  • size: number - Current number of messages in queue
  • isEmpty: boolean - Whether queue is empty
  • isFull: boolean - Whether queue is at maximum capacity

Types

MessagePriority

enum MessagePriority {
  LOW = 0,
  NORMAL = 1,
  HIGH = 2,
  URGENT = 3
}

Message

interface Message {
  id: string;
  content: any;
  priority: MessagePriority;
  timestamp: Date;
  expiresAt?: Date;
  metadata?: Record<string, any>;
  retryCount?: number;
  maxRetries?: number;
}

QueueEvent

interface QueueEvent {
  type: 'message_added' | 'message_processed' | 'message_failed' | 'queue_full' | 'queue_empty';
  messageId?: string;
  timestamp: Date;
  data?: any;
}

MessageFilter

interface MessageFilter {
  priority?: MessagePriority;
  fromTimestamp?: Date;
  toTimestamp?: Date;
  metadata?: Record<string, any>;
  limit?: number;
  offset?: number;
}

Examples

Basic Usage

import { MessageQueue, MessagePriority } from '@bernierllc/message-queue';

const queue = new MessageQueue({
  name: 'email-queue',
  maxSize: 1000,
  enableRetries: true,
  maxRetries: 3
});

// Add messages
queue.addMessage({ to: 'user@example.com', subject: 'Welcome' }, MessagePriority.NORMAL);
queue.addMessage({ to: 'admin@example.com', subject: 'Alert' }, MessagePriority.HIGH);

// Process messages
const emailProcessor = async (message) => {
  console.log(`Sending email: ${message.content.subject}`);
  // Send email logic here
  return true;
};

while (!queue.isEmpty) {
  await queue.processMessage(emailProcessor);
}

Event-Driven Processing

const queue = new MessageQueue({ name: 'event-queue' });

// Set up event handlers
queue.onEvent((event) => {
  switch (event.type) {
    case 'message_added':
      console.log(`Message added: ${event.messageId}`);
      break;
    case 'message_processed':
      console.log(`Message processed: ${event.messageId}`);
      break;
    case 'queue_full':
      console.log('Queue is full!');
      break;
  }
});

// Add messages
queue.addMessage('Event message 1');
queue.addMessage('Event message 2');

Message Filtering

const queue = new MessageQueue({ name: 'filter-queue' });

// Add messages with metadata
queue.addMessage('User notification', MessagePriority.HIGH, { 
  type: 'notification', 
  userId: '123' 
});

queue.addMessage('System log', MessagePriority.LOW, { 
  type: 'log', 
  system: 'auth' 
});

// Filter by metadata
const notifications = queue.getMessages({ 
  metadata: { type: 'notification' } 
});

// Filter by priority
const highPriority = queue.getMessages({ 
  priority: MessagePriority.HIGH 
});

// Filter with pagination
const paginated = queue.getMessages({ 
  limit: 10, 
  offset: 0 
});

Retry Logic with Exponential Backoff

const queue = new MessageQueue({
  name: 'retry-queue',
  enableRetries: true,
  maxRetries: 3,
  retryDelay: 1000,        // Base delay: 1 second
  retryMaxDelay: 10000,    // Max delay: 10 seconds
  retryJitter: true        // Add jitter to prevent thundering herd
});

queue.addMessage('Failing task');

const processor = async (message) => {
  // Simulate unreliable processing
  if (Math.random() < 0.7) {
    throw new Error('Processing failed');
  }
  return true;
};

await queue.processMessage(processor);
// Message will be retried with exponential backoff:
// Attempt 1: ~1 second delay
// Attempt 2: ~2 seconds delay  
// Attempt 3: ~4 seconds delay
// Uses @bernierllc/retry-policy for backoff calculation

Error Handling

The queue provides comprehensive error handling:

const result = queue.addMessage('test');
if (!result.success) {
  console.error('Failed to add message:', result.error);
}

const processResult = await queue.processMessage(processor);
if (!processResult.success) {
  console.error('Failed to process message:', processResult.error);
}

Performance Considerations

  • Memory Usage: Messages are stored in memory by default. For large queues, consider implementing persistence.
  • Processing Speed: The queue processes messages sequentially. For high-throughput scenarios, consider using multiple queues or workers.
  • Event Handlers: Keep event handlers lightweight to avoid blocking queue operations.

License

This file is licensed to the client under a limited-use license. The client may use and modify this code only within the scope of the project it was delivered for. Redistribution or use in other products or commercial offerings is not permitted without written consent from Bernier LLC.

Keywords

message-queue

FAQs

Package last updated on 18 Aug 2025

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

About

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.

  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc

U.S. Patent No. 12,346,443 & 12,314,394. Other pending.