Socket
Book a DemoInstallSign in
Socket

@message-queue-toolkit/core

Package Overview
Dependencies
Maintainers
4
Versions
77
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@message-queue-toolkit/core

Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently

Source
npmnpm
Version
24.2.0
Version published
Weekly downloads
3.5K
35.91%
Maintainers
4
Weekly downloads
 
Created
Source

@message-queue-toolkit/core

Core library for message-queue-toolkit. Provides foundational abstractions, utilities, and base classes for building message queue publishers and consumers.

Table of Contents

Installation

npm install @message-queue-toolkit/core zod

Peer Dependencies:

  • zod - Schema validation

Overview

The core package provides the foundational building blocks used by all protocol-specific implementations (SQS, SNS, AMQP, Kafka, GCP Pub/Sub). It includes:

  • Base Classes: Abstract classes for publishers and consumers
  • Handler System: Type-safe message routing and handling
  • Validation: Zod schema validation and message parsing
  • Utilities: Retry logic, date handling, environment utilities
  • Testing: Handler spies for testing async message flows
  • Extensibility: Interfaces for payload stores, deduplication stores, and metrics

Core Concepts

Message Schemas

Messages are validated using Zod schemas. The library uses configurable field names:

  • messageTypeField (required): Field containing the message type discriminator (must be z.literal() for routing)
  • messageIdField (default: 'id'): Field containing the message ID
  • messageTimestampField (default: 'timestamp'): Field containing the timestamp
import { z } from 'zod'

const UserCreatedSchema = z.object({
  id: z.string(),
  type: z.literal('user.created'),  // Used for routing
  timestamp: z.string().datetime(),
  userId: z.string(),
  email: z.string().email(),
})

type UserCreated = z.infer<typeof UserCreatedSchema>

Handler Configuration

Use MessageHandlerConfigBuilder to configure handlers for different message types:

import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'

const handlers = new MessageHandlerConfigBuilder<SupportedMessages, ExecutionContext>()
  .addConfig(
    UserCreatedSchema,
    async (message, context, preHandlingOutputs) => {
      await context.userService.createUser(message.userId)
      return { result: 'success' }
    }
  )
  .addConfig(
    UserUpdatedSchema,
    async (message, context, preHandlingOutputs) => {
      await context.userService.updateUser(message.userId, message.changes)
      return { result: 'success' }
    }
  )
  .build()

Pre-handlers and Barriers

Pre-handlers are middleware functions that run before the main handler:

const preHandlers = [
  (message, context, output, next) => {
    // Enrich context, validate prerequisites, etc.
    output.logger = context.logger.child({ messageId: message.id })
    next({ result: 'success' })
  },
]

Barriers control whether a message should be processed or retried later:

const preHandlerBarrier = async (message, context, preHandlerOutput) => {
  const prerequisiteMet = await checkPrerequisite(message)
  return {
    isPassing: prerequisiteMet,
    output: { ready: true },
  }
}

Handler Spies

Handler spies enable testing of async message flows:

// Enable in consumer/publisher options
{ handlerSpy: true }

// Wait for specific messages in tests
const result = await consumer.handlerSpy.waitForMessageWithId('msg-123', 'consumed', 5000)
expect(result.userId).toBe('user-456')

Key Classes

AbstractQueueService

Base class for all queue services. Provides:

  • Message serialization/deserialization
  • Schema validation
  • Retry logic with exponential backoff
  • Payload offloading support
  • Message deduplication primitives

MessageHandlerConfigBuilder

Fluent builder for configuring message handlers:

import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'

const handlers = new MessageHandlerConfigBuilder<
  SupportedMessages,
  ExecutionContext,
  PrehandlerOutput
>()
  .addConfig(Schema1, handler1)
  .addConfig(Schema2, handler2, {
    preHandlers: [preHandler1, preHandler2],
    preHandlerBarrier: barrierFn,
    messageLogFormatter: (msg) => ({ id: msg.id }),
  })
  .build()

HandlerContainer

Routes messages to appropriate handlers based on message type:

import { HandlerContainer } from '@message-queue-toolkit/core'

const container = new HandlerContainer({
  messageHandlers: handlers,
  messageTypeField: 'type',
})

const handler = container.resolveHandler(message.type)

MessageSchemaContainer

Manages Zod schemas and validates messages:

import { MessageSchemaContainer } from '@message-queue-toolkit/core'

const container = new MessageSchemaContainer({
  messageSchemas: [Schema1, Schema2],
  messageTypeField: 'type',
})

const schema = container.resolveSchema(message.type)

AbstractPublisherManager

Factory pattern for spawning publishers on demand:

import { AbstractPublisherManager } from '@message-queue-toolkit/core'

// Automatically spawns publishers and fills metadata
await publisherManager.publish('user-events-topic', {
  type: 'user.created',
  userId: 'user-123',
})

DomainEventEmitter

Event emitter for domain events:

import { DomainEventEmitter } from '@message-queue-toolkit/core'

const emitter = new DomainEventEmitter()

emitter.on('user.created', async (event) => {
  console.log('User created:', event.userId)
})

await emitter.emit('user.created', { userId: 'user-123' })

Utilities

NO_MESSAGE_TYPE_FIELD

Use this constant when your consumer should accept all message types without routing:

import { NO_MESSAGE_TYPE_FIELD } from '@message-queue-toolkit/core'

// Consumer will use a single handler for all messages
{
  messageTypeField: NO_MESSAGE_TYPE_FIELD,
  handlers: new MessageHandlerConfigBuilder()
    .addConfig(PassthroughSchema, async (message) => {
      // Handles any message type
      return { result: 'success' }
    })
    .build(),
}

Error Classes

import {
  MessageValidationError,
  MessageInvalidFormatError,
  DoNotProcessMessageError,
  RetryMessageLaterError,
} from '@message-queue-toolkit/core'

// Validation failed
throw new MessageValidationError(zodError.issues)

// Message format is invalid (cannot parse)
throw new MessageInvalidFormatError({ message: 'Invalid JSON' })

// Do not process this message (skip without retry)
throw new DoNotProcessMessageError({ message: 'Duplicate message' })

// Retry this message later
throw new RetryMessageLaterError({ message: 'Dependency not ready' })

Message Deduplication

Interfaces for implementing deduplication stores:

import type { MessageDeduplicationStore, ReleasableLock } from '@message-queue-toolkit/core'

// Implement custom deduplication store
class MyDeduplicationStore implements MessageDeduplicationStore {
  async keyExists(key: string): Promise<boolean> { /* ... */ }
  async setKey(key: string, ttlSeconds: number): Promise<void> { /* ... */ }
  async acquireLock(key: string, options: AcquireLockOptions): Promise<ReleasableLock> { /* ... */ }
}

Payload Offloading

Interfaces for implementing payload stores:

import type { PayloadStore, PayloadStoreConfig } from '@message-queue-toolkit/core'

// Implement custom payload store
class MyPayloadStore implements PayloadStore {
  async storePayload(payload: Buffer, messageId: string): Promise<PayloadRef> { /* ... */ }
  async retrievePayload(ref: PayloadRef): Promise<Buffer> { /* ... */ }
}

API Reference

Types

// Handler result type
type HandlerResult = Either<'retryLater', 'success'>

// Pre-handler signature
type Prehandler<Message, Context, Output> = (
  message: Message,
  context: Context,
  output: Output,
  next: (result: PrehandlerResult) => void
) => void

// Barrier signature
type BarrierCallback<Message, Context, PrehandlerOutput, BarrierOutput> = (
  message: Message,
  context: Context,
  preHandlerOutput: PrehandlerOutput
) => Promise<BarrierResult<BarrierOutput>>

// Barrier result
type BarrierResult<Output> =
  | { isPassing: true; output: Output }
  | { isPassing: false; output?: never }

Utility Functions

// Environment utilities
isProduction(): boolean
reloadConfig(): void

// Date utilities
isRetryDateExceeded(timestamp: string | Date, maxRetryDuration: number): boolean

// Message parsing
parseMessage<T>(data: unknown, schema: ZodSchema<T>): ParseMessageResult<T>

// Wait utilities
waitAndRetry<T>(fn: () => Promise<T>, options: WaitAndRetryOptions): Promise<T>

// Object utilities
objectMatches(obj: unknown, pattern: unknown): boolean
isShallowSubset(subset: object, superset: object): boolean

Keywords

message

FAQs

Package last updated on 04 Dec 2025

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts