
Research
2025 Report: Destructive Malware in Open Source Packages
Destructive malware is rising across open source registries, using delays and kill switches to wipe code, break builds, and disrupt CI/CD.
@message-queue-toolkit/core
Advanced tools
Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently
Core library for message-queue-toolkit. Provides foundational abstractions, utilities, and base classes for building message queue publishers and consumers.
npm install @message-queue-toolkit/core zod
Peer Dependencies:
zod - Schema validationThe core package provides the foundational building blocks used by all protocol-specific implementations (SQS, SNS, AMQP, Kafka, GCP Pub/Sub). It includes:
Messages are validated using Zod schemas. The library uses configurable field names:
messageTypeResolver: Configuration for resolving the message type discriminator (see Message Type Resolution)messageIdField (default: 'id'): Field containing the message IDmessageTimestampField (default: 'timestamp'): Field containing the timestampimport { z } from 'zod'
const UserCreatedSchema = z.object({
id: z.string(),
type: z.literal('user.created'), // Used for routing
timestamp: z.string().datetime(),
userId: z.string(),
email: z.string().email(),
})
type UserCreated = z.infer<typeof UserCreatedSchema>
The message type is a discriminator field that identifies what kind of event or command a message represents. It's used for:
In a typical event-driven architecture, a single queue or topic may receive multiple types of messages. For example, a user-events queue might receive user.created, user.updated, and user.deleted events. The message type tells the consumer which handler should process each message.
The messageTypeResolver configuration supports three modes:
Use when the message type is a field in the parsed message body. Supports dot notation for nested paths:
{
messageTypeResolver: { messageTypePath: 'type' }, // Extracts from message.type
}
// Nested path example
{
messageTypeResolver: { messageTypePath: 'metadata.eventType' }, // Extracts from message.metadata.eventType
}
Use when all messages are of the same type:
{
messageTypeResolver: { literal: 'order.created' }, // All messages treated as this type
}
Use for complex scenarios where the type needs to be extracted from message attributes, nested fields, or requires transformation:
import type { MessageTypeResolverConfig } from '@message-queue-toolkit/core'
const resolverConfig: MessageTypeResolverConfig = {
resolver: ({ messageData, messageAttributes }) => {
// Your custom logic here
return 'resolved.type'
},
}
Important: The resolver function must always return a valid string. If the type cannot be determined, either return a default type or throw an error with a descriptive message.
When publishing your own events directly to SQS, you control the message format:
// Message format you control
{
"id": "msg-123",
"type": "order.created", // Your type field
"timestamp": "2024-01-15T10:30:00Z",
"payload": {
"orderId": "order-456",
"amount": 99.99
}
}
// Configuration
{
messageTypeResolver: { messageTypePath: 'type' },
}
EventBridge events have a specific structure with detail-type:
// EventBridge event structure delivered to SQS
{
"version": "0",
"id": "12345678-1234-1234-1234-123456789012",
"detail-type": "Order Created", // EventBridge uses detail-type
"source": "com.myapp.orders",
"account": "123456789012",
"time": "2024-01-15T10:30:00Z",
"region": "us-east-1",
"detail": {
"orderId": "order-456",
"amount": 99.99
}
}
// Configuration
{
messageTypeResolver: { messageTypePath: 'detail-type' },
}
// Or with resolver for normalization
{
messageTypeResolver: {
resolver: ({ messageData }) => {
const data = messageData as { 'detail-type'?: string; source?: string }
const detailType = data['detail-type']
if (!detailType) throw new Error('detail-type is required')
// Optionally normalize: "Order Created" → "order.created"
return detailType.toLowerCase().replace(/ /g, '.')
},
},
}
SNS messages wrapped in SQS have the actual payload in the Message field (handled automatically by the library after unwrapping):
// After SNS envelope unwrapping, you get your original message
{
"id": "msg-123",
"type": "user.signup.completed",
"userId": "user-789",
"email": "user@example.com"
}
// Configuration
{
messageTypeResolver: { messageTypePath: 'type' },
}
Kafka typically uses topic-based routing, but you may still need message types within a topic:
// Kafka message value (JSON)
{
"eventType": "inventory.reserved",
"eventId": "evt-123",
"timestamp": 1705312200000,
"data": {
"sku": "PROD-001",
"quantity": 5
}
}
// Configuration
{
messageTypeResolver: { messageTypePath: 'eventType' },
}
// Or using Kafka headers (via custom resolver)
{
messageTypeResolver: {
resolver: ({ messageData, messageAttributes }) => {
// Kafka headers are passed as messageAttributes
if (messageAttributes?.['ce_type']) {
return messageAttributes['ce_type'] as string // CloudEvents header
}
const data = messageData as { eventType?: string }
if (!data.eventType) throw new Error('eventType required')
return data.eventType
},
},
}
When you control the message format in Pub/Sub:
// Your message (base64-decoded from data field)
{
"type": "payment.processed",
"paymentId": "pay-123",
"amount": 150.00,
"currency": "USD"
}
// Configuration
{
messageTypeResolver: { messageTypePath: 'type' },
}
Cloud Storage notifications put the event type in message attributes, not the data payload:
// Pub/Sub message structure for Cloud Storage notifications
{
"data": "eyJraW5kIjoic3RvcmFnZSMgb2JqZWN0In0=", // Base64-encoded object metadata
"attributes": {
"eventType": "OBJECT_FINALIZE", // Type is HERE, not in data!
"bucketId": "my-bucket",
"objectId": "path/to/file.jpg",
"objectGeneration": "1705312200000"
},
"messageId": "123456789",
"publishTime": "2024-01-15T10:30:00Z"
}
// Configuration - must use resolver to access attributes
{
messageTypeResolver: {
resolver: ({ messageAttributes }) => {
const eventType = messageAttributes?.eventType as string
if (!eventType) {
throw new Error('eventType attribute required for Cloud Storage notifications')
}
// Map GCS event types to your internal types
const typeMap: Record<string, string> = {
'OBJECT_FINALIZE': 'storage.object.created',
'OBJECT_DELETE': 'storage.object.deleted',
'OBJECT_ARCHIVE': 'storage.object.archived',
'OBJECT_METADATA_UPDATE': 'storage.object.metadataUpdated',
}
return typeMap[eventType] ?? eventType
},
},
}
Eventarc delivers events in CloudEvents format:
// CloudEvents structured format
{
"specversion": "1.0",
"type": "google.cloud.storage.object.v1.finalized", // CloudEvents type
"source": "//storage.googleapis.com/projects/_/buckets/my-bucket",
"id": "1234567890",
"time": "2024-01-15T10:30:00Z",
"datacontenttype": "application/json",
"data": {
"bucket": "my-bucket",
"name": "path/to/file.jpg",
"contentType": "image/jpeg"
}
}
// Configuration
{
messageTypeResolver: { messageTypePath: 'type' }, // CloudEvents type is at root level
}
// Or with mapping to simpler types
{
messageTypeResolver: {
resolver: ({ messageData }) => {
const data = messageData as { type?: string }
const ceType = data.type
if (!ceType) throw new Error('CloudEvents type required')
// Map verbose CloudEvents types to simpler names
if (ceType.includes('storage.object') && ceType.includes('finalized')) {
return 'storage.object.created'
}
if (ceType.includes('storage.object') && ceType.includes('deleted')) {
return 'storage.object.deleted'
}
return ceType
},
},
}
When a queue/subscription only ever receives one type of message, use literal:
// Dedicated queue for order.created events only
{
messageTypeResolver: {
literal: 'order.created',
},
}
This is useful for:
Use MessageHandlerConfigBuilder to configure handlers for different message types:
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'
const handlers = new MessageHandlerConfigBuilder<SupportedMessages, ExecutionContext>()
.addConfig(
UserCreatedSchema,
async (message, context, preHandlingOutputs) => {
await context.userService.createUser(message.userId)
return { result: 'success' }
}
)
.addConfig(
UserUpdatedSchema,
async (message, context, preHandlingOutputs) => {
await context.userService.updateUser(message.userId, message.changes)
return { result: 'success' }
}
)
.build()
Pre-handlers are middleware functions that run before the main handler:
const preHandlers = [
(message, context, output, next) => {
// Enrich context, validate prerequisites, etc.
output.logger = context.logger.child({ messageId: message.id })
next({ result: 'success' })
},
]
Barriers control whether a message should be processed or retried later:
const preHandlerBarrier = async (message, context, preHandlerOutput) => {
const prerequisiteMet = await checkPrerequisite(message)
return {
isPassing: prerequisiteMet,
output: { ready: true },
}
}
Handler spies enable testing of async message flows:
// Enable in consumer/publisher options
{ handlerSpy: true }
// Wait for specific messages in tests
const result = await consumer.handlerSpy.waitForMessageWithId('msg-123', 'consumed', 5000)
expect(result.userId).toBe('user-456')
Base class for all queue services. Provides:
Fluent builder for configuring message handlers:
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'
const handlers = new MessageHandlerConfigBuilder<
SupportedMessages,
ExecutionContext,
PrehandlerOutput
>()
.addConfig(Schema1, handler1)
.addConfig(Schema2, handler2, {
preHandlers: [preHandler1, preHandler2],
preHandlerBarrier: barrierFn,
messageLogFormatter: (msg) => ({ id: msg.id }),
})
.build()
The third parameter to addConfig accepts these options:
| Option | Type | Description |
|---|---|---|
messageType | string | Explicit message type for routing. Required when using custom resolver. |
messageLogFormatter | (message) => unknown | Custom formatter for logging |
preHandlers | Prehandler[] | Middleware functions run before the handler |
preHandlerBarrier | BarrierCallback | Barrier function for out-of-order message handling |
When using a custom resolver function (messageTypeResolver: { resolver: fn }), the message type cannot be automatically extracted from schemas at registration time. You must provide an explicit messageType for each handler:
const handlers = new MessageHandlerConfigBuilder<SupportedMessages, Context>()
.addConfig(
STORAGE_OBJECT_SCHEMA,
handleObjectCreated,
{ messageType: 'storage.object.created' } // Required for custom resolver
)
.addConfig(
STORAGE_DELETE_SCHEMA,
handleObjectDeleted,
{ messageType: 'storage.object.deleted' } // Required for custom resolver
)
.build()
const container = new HandlerContainer({
messageHandlers: handlers,
messageTypeResolver: {
resolver: ({ messageAttributes }) => {
// Map external event types to your internal types
const eventType = messageAttributes?.eventType as string
if (eventType === 'OBJECT_FINALIZE') return 'storage.object.created'
if (eventType === 'OBJECT_DELETE') return 'storage.object.deleted'
throw new Error(`Unknown event type: ${eventType}`)
},
},
})
Priority for determining handler message type:
messageType in handler options (highest priority)messageTypeResolver: { literal: 'type' }messageTypePathIf the message type cannot be determined, an error is thrown during container construction.
Routes messages to appropriate handlers based on message type:
import { HandlerContainer } from '@message-queue-toolkit/core'
const container = new HandlerContainer({
messageHandlers: handlers,
messageTypeResolver: { messageTypePath: 'type' },
})
const handler = container.resolveHandler(message.type)
Manages Zod schemas and validates messages:
import { MessageSchemaContainer } from '@message-queue-toolkit/core'
const container = new MessageSchemaContainer({
messageSchemas: [{ schema: Schema1 }, { schema: Schema2 }],
messageDefinitions: [],
messageTypeResolver: { messageTypePath: 'type' },
})
const result = container.resolveSchema(message)
if ('error' in result) {
// Handle error
} else {
const schema = result.result
}
Factory pattern for spawning publishers on demand:
import { AbstractPublisherManager } from '@message-queue-toolkit/core'
// Automatically spawns publishers and fills metadata
await publisherManager.publish('user-events-topic', {
type: 'user.created',
userId: 'user-123',
})
Event emitter for domain events:
import { DomainEventEmitter } from '@message-queue-toolkit/core'
const emitter = new DomainEventEmitter()
emitter.on('user.created', async (event) => {
console.log('User created:', event.userId)
})
await emitter.emit('user.created', { userId: 'user-123' })
import {
MessageValidationError,
MessageInvalidFormatError,
DoNotProcessMessageError,
RetryMessageLaterError,
} from '@message-queue-toolkit/core'
// Validation failed
throw new MessageValidationError(zodError.issues)
// Message format is invalid (cannot parse)
throw new MessageInvalidFormatError({ message: 'Invalid JSON' })
// Do not process this message (skip without retry)
throw new DoNotProcessMessageError({ message: 'Duplicate message' })
// Retry this message later
throw new RetryMessageLaterError({ message: 'Dependency not ready' })
Interfaces for implementing deduplication stores:
import type { MessageDeduplicationStore, ReleasableLock } from '@message-queue-toolkit/core'
// Implement custom deduplication store
class MyDeduplicationStore implements MessageDeduplicationStore {
async keyExists(key: string): Promise<boolean> { /* ... */ }
async setKey(key: string, ttlSeconds: number): Promise<void> { /* ... */ }
async acquireLock(key: string, options: AcquireLockOptions): Promise<ReleasableLock> { /* ... */ }
}
Interfaces for implementing payload stores:
import type { PayloadStore, PayloadStoreConfig } from '@message-queue-toolkit/core'
// Implement custom payload store
class MyPayloadStore implements PayloadStore {
async storePayload(payload: Buffer, messageId: string): Promise<PayloadRef> { /* ... */ }
async retrievePayload(ref: PayloadRef): Promise<Buffer> { /* ... */ }
}
// Handler result type
type HandlerResult = Either<'retryLater', 'success'>
// Pre-handler signature
type Prehandler<Message, Context, Output> = (
message: Message,
context: Context,
output: Output,
next: (result: PrehandlerResult) => void
) => void
// Barrier signature
type BarrierCallback<Message, Context, PrehandlerOutput, BarrierOutput> = (
message: Message,
context: Context,
preHandlerOutput: PrehandlerOutput
) => Promise<BarrierResult<BarrierOutput>>
// Barrier result
type BarrierResult<Output> =
| { isPassing: true; output: Output }
| { isPassing: false; output?: never }
// Message type resolver context
type MessageTypeResolverContext = {
messageData: unknown
messageAttributes?: Record<string, unknown>
}
// Message type resolver function
type MessageTypeResolverFn = (context: MessageTypeResolverContext) => string
// Message type resolver configuration
type MessageTypeResolverConfig =
| { messageTypePath: string } // Extract from field at root of message data
| { literal: string } // Constant type for all messages
| { resolver: MessageTypeResolverFn } // Custom resolver function
// Environment utilities
isProduction(): boolean
reloadConfig(): void
// Date utilities
isRetryDateExceeded(timestamp: string | Date, maxRetryDuration: number): boolean
// Message parsing
parseMessage<T>(data: unknown, schema: ZodSchema<T>): ParseMessageResult<T>
// Wait utilities
waitAndRetry<T>(fn: () => Promise<T>, options: WaitAndRetryOptions): Promise<T>
// Object utilities
objectMatches(obj: unknown, pattern: unknown): boolean
isShallowSubset(subset: object, superset: object): boolean
FAQs
Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently
The npm package @message-queue-toolkit/core receives a total of 2,889 weekly downloads. As such, @message-queue-toolkit/core popularity was classified as popular.
We found that @message-queue-toolkit/core demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 4 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Research
Destructive malware is rising across open source registries, using delays and kill switches to wipe code, break builds, and disrupt CI/CD.

Security News
Socket CTO Ahmad Nassri shares practical AI coding techniques, tools, and team workflows, plus what still feels noisy and why shipping remains human-led.

Research
/Security News
A five-month operation turned 27 npm packages into durable hosting for browser-run lures that mimic document-sharing portals and Microsoft sign-in, targeting 25 organizations across manufacturing, industrial automation, plastics, and healthcare for credential theft.