
Research
/Security News
Popular Tinycolor npm Package Compromised in Supply Chain Attack Affecting 40+ Packages
Malicious update to @ctrl/tinycolor on npm is part of a supply-chain attack hitting 40+ packages across maintainers
@vcita/event-bus-nestjs
Advanced tools
A comprehensive NestJS module for publishing and subscribing to standardized events via RabbitMQ/AMQP with built-in tracing, retry mechanisms, and structured event formatting.
✅ Standardized Event Publishing: Automatically structures events with headers (timestamps, trace IDs, actor info)
✅ Flexible Event Subscription: Subscribe to events using decorators with pattern matching
✅ AMQP Connection Management: Handles RabbitMQ connections, queues, and exchanges automatically
✅ Distributed Tracing: Built-in support for tracing across services
✅ Retry Mechanisms: Configurable retry logic
✅ Error Handling: Comprehensive error handling with error queues
✅ Legacy Support: Backward compatibility with legacy event formats
✅ Testing Support: Automatic mocking in test environments
✅ Metrics Integration: Prometheus metrics for monitoring (via @vcita/infra-nestjs)
npm install @vcita/event-bus-nestjs
Required Peer Dependencies:
npm install @nestjs/common @nestjs/core @vcita/infra-nestjs @vcita/oauth-client-nestjs
You can import the modules individually based on your needs:
For Publishing Only:
// app.module.ts
import { Module } from '@nestjs/common';
import { PublisherModule } from '@vcita/event-bus-nestjs';
@Module({
imports: [
PublisherModule,
],
})
export class AppModule {}
For Subscribing Only:
// app.module.ts
import { Module } from '@nestjs/common';
import { SubscriberModule } from '@vcita/event-bus-nestjs';
@Module({
imports: [
SubscriberModule,
],
})
export class AppModule {}
For Both Publishing and Subscribing:
// app.module.ts
import { Module } from '@nestjs/common';
import { PublisherModule, SubscriberModule } from '@vcita/event-bus-nestjs';
@Module({
imports: [
PublisherModule,
SubscriberModule,
],
})
export class AppModule {}
// my.service.ts
import { Injectable } from '@nestjs/common';
import { EventBusPublisher } from '@vcita/event-bus-nestjs';
@Injectable()
export class UserService {
constructor(private readonly eventBusPublisher: EventBusPublisher) {}
async createUser(userData: any, actor: any) {
const user = await this.saveUser(userData);
// Publish event - routing key will be: scheduling.user.created
await this.eventBusPublisher.publish({
entityType: 'user',
eventType: 'created',
data: user,
actor: actor,
});
return user;
}
}
// user.subscriber.ts
import { Injectable } from '@nestjs/common';
import { AuthorizationPayloadEntity } from '@vcita/oauth-client-nestjs';
import { InfraLoggerService } from '@vcita/infra-nestjs';
import { SubscribeTo, EventPayload, EventHeaders } from '@vcita/event-bus-nestjs';
@Injectable()
export class UserSubscriber {
private readonly logger = new InfraLoggerService(UserSubscriber.name);
@SubscribeTo({
domain: 'scheduling',
entity: 'user',
action: 'created',
})
async handleUserCreated(
auth: AuthorizationPayloadEntity,
event: EventPayload<{ id: string; email: string }>,
headers: EventHeaders,
): Promise<void> {
this.logger.log(`User created: ${event.data.id} by ${auth.actor.id}`);
// Your business logic here
}
}
Don't forget to register your subscriber in your module:
// app.module.ts
import { Module } from '@nestjs/common';
import { SubscriberModule } from '@vcita/event-bus-nestjs';
import { UserSubscriber } from './user.subscriber';
@Module({
imports: [SubscriberModule],
providers: [UserSubscriber], // Add your subscribers here
})
export class AppModule {}
The module uses environment variables for configuration. The configuration is automatically loaded from the environment variables when the module is imported.
The module reads configuration from the following environment variables:
Environment Variable | Type | Required | Description | Default |
---|---|---|---|---|
RABBITMQ_DSN | string | ✅ | RabbitMQ connection string | - |
APP_NAME | string | ✅ | Your service name (used for queues and source service) | - |
EVENT_BUS_EXCHANGE_NAME | string | ❌ | RabbitMQ exchange name for standard events | event_bus |
EVENT_BUS_DEFAULT_DOMAIN | string | ❌ | Default domain for routing keys | default |
EVENT_BUS_LEGACY_EXCHANGE | string | ❌ | Exchange name for legacy events | vcita.model_updates |
EVENT_BUS_DEFAULT_MAX_RETRIES | number | ❌ | Default retry count | 1 |
EVENT_BUS_DEFAULT_RETRY_DELAY_MS | number | ❌ | Default retry delay in milliseconds | 10000 |
Set these environment variables in your .env
file or deployment environment:
# Required
RABBITMQ_DSN=amqp://username:password@localhost:5672
APP_NAME=my-service
# Optional
EVENT_BUS_EXCHANGE_NAME=event_bus
EVENT_BUS_DEFAULT_DOMAIN=scheduling
EVENT_BUS_LEGACY_EXCHANGE=vcita.model_updates
EVENT_BUS_DEFAULT_MAX_RETRIES=1
EVENT_BUS_DEFAULT_RETRY_DELAY_MS=10000
import { EventBusPublisher } from '@vcita/event-bus-nestjs';
@Injectable()
export class MyService {
constructor(private readonly eventBusPublisher: EventBusPublisher) {}
async publishEvent() {
await this.eventBusPublisher.publish({
entityType: 'resource', // Required: Entity type
eventType: 'created', // Required: Event type
data: { id: '123', name: 'Resource Name' }, // Required: Event data
actor: { id: 'user-1', type: 'user' }, // Required: Actor who triggered the event
version: 'v2', // Optional: Schema version (default: 'v1')
domain: 'payments', // Optional: Domain override (default: from config)
});
}
}
Common event types include:
created
- Entity was createdupdated
- Entity was updateddeleted
- Entity was deletedYou can also use custom event types as needed.
The actor
field describes who or what triggered the event:
// User-triggered event
actor: {
id: 'user-123',
type: 'user',
email: 'user@example.com',
name: 'John Doe'
}
// System-triggered event
actor: {
id: 'system',
type: 'system',
name: 'Automated Process'
}
Use the @SubscribeTo
decorator for structured events:
import { SubscribeTo } from '@vcita/event-bus-nestjs';
@Injectable()
export class MySubscriber {
@SubscribeTo({
domain: 'payments', // Domain to listen to
entity: 'product', // Entity type to listen to
action: 'created', // Action to listen to
queue: 'my-custom-queue', // Optional: custom queue name
retry: { // Optional: retry configuration
count: 5,
delayMs: 10000
}
})
async handleProductCreated(
auth: AuthorizationPayloadEntity, // Authentication context
event: EventPayload<ProductData>, // Event data
headers: EventHeaders, // Event metadata
): Promise<void> {
// Your business logic here
}
}
You can use wildcards to subscribe to multiple events:
@Injectable()
export class ProductSubscriber {
// Listen to all product events
@SubscribeTo({
domain: 'payments',
entity: 'product',
action: '*',
})
async handleAllProductEvents(
auth: AuthorizationPayloadEntity,
event: EventPayload<any>,
headers: EventHeaders,
): Promise<void> {
this.logger.log(`Product event: ${headers.event_type}`);
}
// Listen to all entities in payments domain
@SubscribeTo({
domain: 'payments',
entity: '*',
action: 'created',
})
async handleAllPaymentCreations(
auth: AuthorizationPayloadEntity,
event: EventPayload<any>,
headers: EventHeaders,
): Promise<void> {
this.logger.log(`Created in payments: ${headers.entity_type}`);
}
}
For backward compatibility with legacy events:
import { LegacySubscribeTo } from '@vcita/event-bus-nestjs';
@Injectable()
export class LegacySubscriber {
@LegacySubscribeTo({
routingKey: 'legacy.orders.*', // RabbitMQ routing key pattern
retry: { count: 1, delayMs: 10000 }
})
async handleLegacyOrder(
payload: unknown, // Raw event payload
headers: any, // Raw AMQP headers
): Promise<void> {
this.logger.log(`Legacy order: ${JSON.stringify(payload)}`);
}
}
Standard Subscription Method:
async methodName(
auth: AuthorizationPayloadEntity, // Actor context with authentication
event: EventPayload<T>, // Structured event data
headers: EventHeaders, // Event metadata
): Promise<void>
Legacy Subscription Method:
async methodName(
payload: unknown, // Raw event payload
headers: any, // Raw AMQP headers
): Promise<void>
Every published event follows this standardized structure:
{
headers: {
event_uid: "550e8400-e29b-41d4-a716-446655440000", // Unique event ID
entity_type: "user", // Entity type
event_type: "created", // Event type
timestamp: "2023-01-01T12:00:00.000Z", // ISO timestamp
source_service: "user-service", // Publishing service
trace_id: "abc123", // Distributed tracing ID
actor: { // Who triggered the event
id: "user-123",
type: "user",
email: "user@example.com"
},
version: "v1" // Schema version
},
payload: {
data: { // Your event data
id: "user-456",
email: "newuser@example.com",
name: "New User"
},
schema_ref: "user/created/v1" // Schema reference
}
}
Events are routed using the pattern: {domain}.{entityType}.{eventType}
Examples:
scheduling.user.created
payments.product.updated
billing.subscription.deleted
The module automatically retries failed event processing:
@SubscribeTo({
domain: 'payments',
entity: 'product',
action: 'created',
retry: {
count: 5, // Retry up to 5 times
delayMs: 10000 // Delay between retries
}
})
async handleProductCreated(
auth: AuthorizationPayloadEntity,
event: EventPayload<ProductData>,
headers: EventHeaders,
): Promise<void> {
// Your logic here
}
Some errors shouldn't be retried (e.g., validation errors):
import { NonRetryableError } from '@vcita/event-bus-nestjs';
@SubscribeTo({
domain: 'payments',
entity: 'product',
action: 'created',
})
async handleProductCreated(
auth: AuthorizationPayloadEntity,
event: EventPayload<ProductData>,
headers: EventHeaders,
): Promise<void> {
try {
await this.validateProduct(event.data);
await this.processProduct(event.data);
} catch (error) {
if (error instanceof ValidationError) {
// Don't retry validation errors
throw new NonRetryableError(error.message);
}
// Other errors will be retried
throw error;
}
}
After all retries are exhausted, messages are sent to error queues for manual inspection.
In test environments (NODE_ENV=test
), the module automatically mocks AMQP connections:
// my.service.spec.ts
import { Test } from '@nestjs/testing';
import { PublisherModule, EventBusPublisher } from '@vcita/event-bus-nestjs';
describe('MyService', () => {
let service: MyService;
let eventBusPublisher: EventBusPublisher;
beforeEach(async () => {
const module = await Test.createTestingModule({
imports: [PublisherModule], // No configuration needed in tests
providers: [MyService],
}).compile();
service = module.get<MyService>(MyService);
eventBusPublisher = module.get<EventBusPublisher>(EventBusPublisher);
});
it('should publish events', async () => {
const user = await service.createUser(userData, actor);
// Verify the event was published
expect(eventBusPublisher.publish).toHaveBeenCalledWith({
entityType: 'user',
eventType: 'created',
data: user,
actor: actor,
});
});
});
You can disable the event bus for testing by setting:
DISABLE_EVENT_BUS=true
See the Configuration section for detailed information about all environment variables.
# Disable event bus functionality (useful for testing)
DISABLE_EVENT_BUS=true
@SubscribeTo({
domain: 'payments',
entity: 'product',
action: 'created',
queue: 'my-custom-queue',
queueOptions: {
durable: true,
arguments: {
'x-message-ttl': 3600000, // 1 hour TTL
},
},
errorQueueOptions: {
durable: true,
arguments: {
'x-message-ttl': 86400000, // 24 hour TTL for error queue
},
},
})
async handleProductCreated(/* ... */) {
// Your logic
}
@Injectable()
export class ProductSubscriber {
@SubscribeTo({
domain: 'payments',
entity: 'product',
action: 'created',
})
async handleProductCreated(/* ... */) {
// Handle creation
}
@SubscribeTo({
domain: 'payments',
entity: 'product',
action: 'updated',
})
async handleProductUpdated(/* ... */) {
// Handle updates
}
@SubscribeTo({
domain: 'payments',
entity: 'product',
action: 'deleted',
})
async handleProductDeleted(/* ... */) {
// Handle deletion
}
}
class EventBusPublisher<T = unknown> {
/**
* Publish an event to the event bus
*/
async publish(options: PublishEventOptions<T>): Promise<void>
}
interface PublishEventOptions<T = unknown> {
entityType: string; // Entity type (e.g., 'user', 'product')
eventType: EventType; // Event type (e.g., 'created', 'updated')
data: T; // Event payload
actor: Actor; // Actor information
version?: string; // Schema version (default: 'v1')
domain?: string; // Domain override
}
interface SubscribeToOptions {
domain: string | '*'; // Domain to listen to
entity: string | '*'; // Entity type to listen to
action: EventType; // Action to listen to
queue?: string; // Custom queue name
retry?: { // Retry configuration
count?: number;
delayMs?: number;
};
queueOptions?: object; // Queue options
errorQueueOptions?: object; // Error queue options
}
interface LegacySubscribeToOptions {
routingKey: string; // RabbitMQ routing key pattern
queue?: string; // Custom queue name
retry?: { // Retry configuration
count?: number;
delayMs?: number;
};
queueOptions?: object; // Queue options
errorQueueOptions?: object; // Error queue options
}
ISC
1.0.3 (2025-08-05)
DISABLE_EVENT_BUS
environment variable to properly disable RabbitMQ module import in SubscriberModule, completing the event bus disable functionalityFAQs
Event Bus for NestJS applications with AMQP support
The npm package @vcita/event-bus-nestjs receives a total of 146 weekly downloads. As such, @vcita/event-bus-nestjs popularity was classified as not popular.
We found that @vcita/event-bus-nestjs demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 0 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
/Security News
Malicious update to @ctrl/tinycolor on npm is part of a supply-chain attack hitting 40+ packages across maintainers
Security News
pnpm's new minimumReleaseAge setting delays package updates to prevent supply chain attacks, with other tools like Taze and NCU following suit.
Security News
The Rust Security Response WG is warning of phishing emails from rustfoundation.dev targeting crates.io users.