@vcita/event-bus-nestjs
A comprehensive NestJS module for publishing and subscribing to standardized events via RabbitMQ/AMQP with built-in tracing, retry mechanisms, and structured event formatting.
Table of Contents
Features
✅ Standardized Event Publishing: Automatically structures events with headers (timestamps, trace IDs, actor info)
✅ Flexible Event Subscription: Subscribe to events using decorators with pattern matching
✅ AMQP Connection Management: Handles RabbitMQ connections, queues, and exchanges automatically
✅ Distributed Tracing: Built-in support for tracing across services
✅ Retry Mechanisms: Configurable retry logic with exponential backoff
✅ Error Handling: Comprehensive error handling with dead letter queues
✅ Legacy Support: Backward compatibility with legacy event formats
✅ Testing Support: Automatic mocking in test environments
✅ Metrics Integration: Prometheus metrics for monitoring (via @vcita/infra-nestjs)
Installation
npm install @vcita/event-bus-nestjs
Required Peer Dependencies:
npm install @nestjs/common @nestjs/core @vcita/infra-nestjs @vcita/oauth-client-nestjs
Quick Start
1. Import the Module
import { Module } from '@nestjs/common';
import { EventBusModule } from '@vcita/event-bus-nestjs';
@Module({
imports: [
EventBusModule,
],
})
export class AppModule {}
2. Publish an Event
import { Injectable } from '@nestjs/common';
import { EventBusPublisher } from '@vcita/event-bus-nestjs';
@Injectable()
export class UserService {
constructor(private readonly eventBusPublisher: EventBusPublisher) {}
async createUser(userData: any, actor: any) {
const user = await this.saveUser(userData);
await this.eventBusPublisher.publish({
entityType: 'user',
eventType: 'created',
data: user,
actor: actor,
});
return user;
}
}
3. Subscribe to Events
import { Injectable } from '@nestjs/common';
import { AuthorizationPayloadEntity } from '@vcita/oauth-client-nestjs';
import { InfraLoggerService } from '@vcita/infra-nestjs';
import { SubscribeTo, EventPayload, EventHeaders } from '@vcita/event-bus-nestjs';
@Injectable()
export class UserSubscriber {
private readonly logger = new InfraLoggerService(UserSubscriber.name);
@SubscribeTo({
domain: 'scheduling',
entity: 'user',
action: 'created',
})
async handleUserCreated(
auth: AuthorizationPayloadEntity,
event: EventPayload<{ id: string; email: string }>,
headers: EventHeaders,
): Promise<void> {
this.logger.log(`User created: ${event.data.id} by ${auth.actor.id}`);
}
}
Don't forget to register your subscriber in your module:
import { Module } from '@nestjs/common';
import { EventBusModule } from '@vcita/event-bus-nestjs';
import { UserSubscriber } from './user.subscriber';
@Module({
imports: [EventBusModule],
providers: [UserSubscriber],
})
export class AppModule {}
Configuration
The module uses environment variables for configuration. The configuration is automatically loaded from the environment variables when the module is imported.
Configuration Options
The module reads configuration from the following environment variables:
RABBITMQ_DSN | string | ✅ | RabbitMQ connection string | - |
APP_NAME | string | ✅ | Your service name (used for queues and source service) | - |
EVENT_BUS_EXCHANGE_NAME | string | ❌ | RabbitMQ exchange name for standard events | event_bus |
EVENT_BUS_DEFAULT_DOMAIN | string | ❌ | Default domain for routing keys | default |
EVENT_BUS_LEGACY_EXCHANGE | string | ❌ | Exchange name for legacy events | vcita.model_updates |
EVENT_BUS_DEFAULT_MAX_RETRIES | number | ❌ | Default retry count | 1 |
EVENT_BUS_DEFAULT_RETRY_DELAY_MS | number | ❌ | Default retry delay in milliseconds | 10000 |
Example Configuration
Set these environment variables in your .env
file or deployment environment:
RABBITMQ_DSN=amqp://username:password@localhost:5672
APP_NAME=my-service
EVENT_BUS_EXCHANGE_NAME=event_bus
EVENT_BUS_DEFAULT_DOMAIN=scheduling
EVENT_BUS_LEGACY_EXCHANGE=vcita.model_updates
EVENT_BUS_DEFAULT_MAX_RETRIES=1
EVENT_BUS_DEFAULT_RETRY_DELAY_MS=10000
Publishing Events
Basic Publishing
import { EventBusPublisher } from '@vcita/event-bus-nestjs';
@Injectable()
export class MyService {
constructor(private readonly eventBusPublisher: EventBusPublisher) {}
async publishEvent() {
await this.eventBusPublisher.publish({
entityType: 'resource',
eventType: 'created',
data: { id: '123', name: 'Resource Name' },
actor: { id: 'user-1', type: 'user' },
version: 'v2',
domain: 'payments',
});
}
}
Event Types
Common event types include:
created
- Entity was created
updated
- Entity was updated
deleted
- Entity was deleted
You can also use custom event types as needed.
Actor Information
The actor
field describes who or what triggered the event:
actor: {
id: 'user-123',
type: 'user',
email: 'user@example.com',
name: 'John Doe'
}
actor: {
id: 'system',
type: 'system',
name: 'Automated Process'
}
Subscribing to Events
Standard Event Subscription
Use the @SubscribeTo
decorator for structured events:
import { SubscribeTo } from '@vcita/event-bus-nestjs';
@Injectable()
export class MySubscriber {
@SubscribeTo({
domain: 'payments',
entity: 'product',
action: 'created',
queue: 'my-custom-queue',
retry: {
count: 5,
delayMs: 10000
}
})
async handleProductCreated(
auth: AuthorizationPayloadEntity,
event: EventPayload<ProductData>,
headers: EventHeaders,
): Promise<void> {
}
}
Wildcard Subscriptions
You can use wildcards to subscribe to multiple events:
@Injectable()
export class ProductSubscriber {
@SubscribeTo({
domain: 'payments',
entity: 'product',
action: '*',
})
async handleAllProductEvents(
auth: AuthorizationPayloadEntity,
event: EventPayload<any>,
headers: EventHeaders,
): Promise<void> {
this.logger.log(`Product event: ${headers.event_type}`);
}
@SubscribeTo({
domain: 'payments',
entity: '*',
action: 'created',
})
async handleAllPaymentCreations(
auth: AuthorizationPayloadEntity,
event: EventPayload<any>,
headers: EventHeaders,
): Promise<void> {
this.logger.log(`Created in payments: ${headers.entity_type}`);
}
}
Legacy Event Subscription
For backward compatibility with legacy events:
import { LegacySubscribeTo } from '@vcita/event-bus-nestjs';
@Injectable()
export class LegacySubscriber {
@LegacySubscribeTo({
routingKey: 'legacy.orders.*',
retry: { count: 1, delayMs: 10000 }
})
async handleLegacyOrder(
payload: unknown,
headers: any,
): Promise<void> {
this.logger.log(`Legacy order: ${JSON.stringify(payload)}`);
}
}
Method Signatures
Standard Subscription Method:
async methodName(
auth: AuthorizationPayloadEntity,
event: EventPayload<T>,
headers: EventHeaders,
): Promise<void>
Legacy Subscription Method:
async methodName(
payload: unknown,
headers: any,
): Promise<void>
Event Structure
Published Event Format
Every published event follows this standardized structure:
{
headers: {
event_uid: "550e8400-e29b-41d4-a716-446655440000",
entity_type: "user",
event_type: "created",
timestamp: "2023-01-01T12:00:00.000Z",
source_service: "user-service",
trace_id: "abc123",
actor: {
id: "user-123",
type: "user",
email: "user@example.com"
},
version: "v1"
},
payload: {
data: {
id: "user-456",
email: "newuser@example.com",
name: "New User"
},
schema_ref: "user/created/v1"
}
}
Routing Keys
Events are routed using the pattern: {domain}.{entityType}.{eventType}
Examples:
scheduling.user.created
payments.product.updated
billing.subscription.deleted
Error Handling & Retries
Built-in Retry Logic
The module automatically retries failed event processing with exponential backoff:
@SubscribeTo({
domain: 'payments',
entity: 'product',
action: 'created',
retry: {
count: 5,
delayMs: 10000
}
})
async handleProductCreated(
auth: AuthorizationPayloadEntity,
event: EventPayload<ProductData>,
headers: EventHeaders,
): Promise<void> {
}
Non-Retryable Errors
Some errors shouldn't be retried (e.g., validation errors):
import { NonRetryableError } from '@vcita/event-bus-nestjs';
@SubscribeTo({
domain: 'payments',
entity: 'product',
action: 'created',
})
async handleProductCreated(
auth: AuthorizationPayloadEntity,
event: EventPayload<ProductData>,
headers: EventHeaders,
): Promise<void> {
try {
await this.validateProduct(event.data);
await this.processProduct(event.data);
} catch (error) {
if (error instanceof ValidationError) {
throw new NonRetryableError(error.message);
}
throw error;
}
}
Dead Letter Queues
After all retries are exhausted, messages are sent to dead letter queues for manual inspection.
Testing
Test Environment Setup
In test environments (NODE_ENV=test
), the module automatically mocks AMQP connections:
import { Test } from '@nestjs/testing';
import { EventBusModule, EventBusPublisher } from '@vcita/event-bus-nestjs';
describe('MyService', () => {
let service: MyService;
let eventBusPublisher: EventBusPublisher;
beforeEach(async () => {
const module = await Test.createTestingModule({
imports: [EventBusModule],
providers: [MyService],
}).compile();
service = module.get<MyService>(MyService);
eventBusPublisher = module.get<EventBusPublisher>(EventBusPublisher);
});
it('should publish events', async () => {
const user = await service.createUser(userData, actor);
expect(eventBusPublisher.publish).toHaveBeenCalledWith({
entityType: 'user',
eventType: 'created',
data: user,
actor: actor,
});
});
});
Disabling Event Bus
You can disable the event bus for testing by setting:
DISABLE_EVENT_BUS=true
Environment Variables
See the Configuration section for detailed information about all environment variables.
Testing Variables
DISABLE_EVENT_BUS=true
Advanced Usage
Custom Queue Configuration
@SubscribeTo({
domain: 'payments',
entity: 'product',
action: 'created',
queue: 'my-custom-queue',
queueOptions: {
durable: true,
arguments: {
'x-message-ttl': 3600000,
},
},
errorQueueOptions: {
durable: true,
arguments: {
'x-message-ttl': 86400000,
},
},
})
async handleProductCreated() {
}
Multiple Event Handlers
@Injectable()
export class ProductSubscriber {
@SubscribeTo({
domain: 'payments',
entity: 'product',
action: 'created',
})
async handleProductCreated() {
}
@SubscribeTo({
domain: 'payments',
entity: 'product',
action: 'updated',
})
async handleProductUpdated() {
}
@SubscribeTo({
domain: 'payments',
entity: 'product',
action: 'deleted',
})
async handleProductDeleted() {
}
}
API Reference
EventBusPublisher
class EventBusPublisher<T = unknown> {
async publish(options: PublishEventOptions<T>): Promise<void>
}
PublishEventOptions
interface PublishEventOptions<T = unknown> {
entityType: string;
eventType: EventType;
data: T;
actor: Actor;
version?: string;
domain?: string;
}
SubscribeTo Options
interface SubscribeToOptions {
domain: string | '*';
entity: string | '*';
action: EventType;
queue?: string;
retry?: {
count?: number;
delayMs?: number;
};
queueOptions?: object;
errorQueueOptions?: object;
}
LegacySubscribeTo Options
interface LegacySubscribeToOptions {
routingKey: string;
queue?: string;
retry?: {
count?: number;
delayMs?: number;
};
queueOptions?: object;
errorQueueOptions?: object;
}
License
ISC