
Research
Malicious npm Packages Impersonate Flashbots SDKs, Targeting Ethereum Wallet Credentials
Four npm packages disguised as cryptographic tools steal developer credentials and send them to attacker-controlled Telegram infrastructure.
@toxicoder/nestjs-kafka
Advanced tools
A powerful and easy-to-use Kafka integration for NestJS applications.
npm install @toxicoder/nestjs-kafka
The NestJS Kafka module provides a seamless integration with Apache Kafka for NestJS applications. It leverages the kafkajs
package and enhances it with NestJS-specific features like decorators, dependency injection, and lifecycle management.
This module helps you:
Variable | Type | Default | Description |
---|---|---|---|
KAFKA_BROKER | string[] | ['localhost:9092'] | Comma-separated list of Kafka brokers |
KAFKA_CLIENT_ID | string | undefined | Client ID for Kafka |
KAFKA_RETRY_COUNT | number | undefined | Number of retries for Kafka operations |
KAFKA_RETRY_DELAY | number | undefined | Initial retry delay in milliseconds |
KAFKA_RETRY_TIMEOUT | number | undefined | Maximum retry time in milliseconds |
KAFKA_ENFORCE_TIMEOUT | boolean | undefined | Whether to enforce request timeout |
KAFKA_CONNECTION_TIMEOUT | number | undefined | Connection timeout in milliseconds |
KAFKA_REQUEST_TIMEOUT | number | undefined | Request timeout in milliseconds |
KAFKA_TOPIC_AUTO_CREATE | boolean | false | Whether to auto-create topics |
KAFKA_LOG_LEVEL | string | 'error' | Log level ('nothing', 'error', 'warn', 'info', 'debug') |
Use forRoot
to configure the module with static options:
import { Module } from '@nestjs/common';
import { KafkaModule } from '@toxicoder/nestjs-kafka';
@Module({
imports: [
KafkaModule.forRoot({
brokers: ['localhost:9092'],
clientId: 'my-app',
topicAutoCreate: true,
retry: {
retries: 3,
initialRetryTime: 300,
maxRetryTime: 30000,
},
}),
],
})
export class AppModule {}
Use forRootAsync
for dynamic configuration, such as loading from a configuration service.
You can set the global
parameter to true
to make the module global.
When a module is global, you don't need to import it in other modules
to use its providers. This is useful when you want to use the KafkaService
across multiple modules without having to import the KafkaModule in each one.
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { KafkaModule } from '@toxicoder/nestjs-kafka';
@Module({
imports: [
ConfigModule.forRoot(),
KafkaModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
global: true, // Makes the module global so you don't need to import it in other modules
useFactory: (configService: ConfigService) => ({
brokers: configService.get<string>('KAFKA_BROKERS').split(','),
clientId: configService.get<string>('KAFKA_CLIENT_ID'),
topicAutoCreate: configService.get<boolean>('KAFKA_TOPIC_AUTO_CREATE'),
retry: {
retries: configService.get<number>('KAFKA_RETRY_COUNT'),
initialRetryTime: configService.get<number>('KAFKA_RETRY_DELAY'),
maxRetryTime: configService.get<number>('KAFKA_RETRY_TIMEOUT'),
},
}),
}),
],
})
export class AppModule {}
The KafkaService
provides methods for interacting with Kafka. You need to initialize it
in your service's onModuleInit
method and clean up in onModuleDestroy
:
import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { KafkaService } from '@toxicoder/nestjs-kafka';
@Injectable()
export class AppService implements OnModuleInit, OnModuleDestroy {
constructor(private readonly kafkaService: KafkaService) {}
async onModuleInit() {
// Initialize Kafka connections
await this.kafkaService.connect();
}
async onModuleDestroy() {
// Clean up Kafka connections
await this.kafkaService.disconnect();
}
}
Use the @KafkaConsumer
decorator to mark methods as Kafka message handlers:
import { Injectable } from '@nestjs/common';
import { KafkaConsumer, KafkaConsumerPayload } from '@toxicoder/nestjs-kafka';
@Injectable()
export class UserService {
@KafkaConsumer('user-created', { groupId: 'user-service' })
async handleUserCreated(payload: KafkaConsumerPayload) {
const user = payload.message.value;
console.log(`User created: ${user.name}`);
}
}
import { Injectable } from '@nestjs/common';
import { KafkaConsumer, KafkaConsumerPayload } from '@toxicoder/nestjs-kafka';
@Injectable()
export class NotificationService {
@KafkaConsumer(
['user-created', 'user-updated'],
{
groupId: 'notification-service',
fromBeginning: true,
autoCommit: false, // for manual acknoledge
sessionTimeout: 30000,
heartbeatInterval: 3000,
},
)
async handleUserEvents(payload: KafkaConsumerPayload) {
try {
const user = payload.message.value;
console.log(`Processing user event for: ${user.name}`);
// Process the message
// Manually acknowledge the message
await payload.ack();
} catch (error) {
console.error('Error processing message:', error);
// Don't ack the message, so it can be reprocessed
}
}
}
Use the send
method to produce messages to Kafka topics:
import { Injectable } from '@nestjs/common';
import { KafkaService } from '@toxicoder/nestjs-kafka';
@Injectable()
export class UserService {
constructor(private readonly kafkaService: KafkaService) {}
async createUser(user: any) {
// Save user to database
// Send event to Kafka
await this.kafkaService.send({
topic: 'user-created',
messages: {
key: user.id,
value: user,
headers: {
source: 'user-service',
timestamp: Date.now().toString(),
},
},
});
}
async updateUsers(users: any[]) {
// Update users in database
// Send multiple messages in one request
await this.kafkaService.send({
topic: 'user-updated',
messages: users.map((user) => ({
key: user.id,
value: user,
})),
});
}
}
You can explicitly ensure that topics exist before using them:
import { Injectable, OnModuleInit } from '@nestjs/common';
import { KafkaService } from '@toxicoder/nestjs-kafka';
@Injectable()
export class AppService implements OnModuleInit {
constructor(private readonly kafkaService: KafkaService) {}
async onModuleInit() {
await this.kafkaService.init();
// Ensure a single topic exists
await this.kafkaService.ensureTopics('user-created');
// Or ensure multiple topics exist
await this.kafkaService.ensureTopics([
'user-created',
'user-updated',
'user-deleted',
]);
}
}
The topicAutoCreate
option enables automatic creation of topics when they are needed but don't exist. When enabled:
numPartitions
and replicationFactor
values from the Kafka broker configurationreplicationFactor
is greater than the number of available brokers, it's limited to the number of brokersThis feature is particularly useful in development environments or when you want to avoid manual topic creation.
To enable topic auto-creation:
KafkaModule.forRoot({
brokers: ['localhost:9092'],
clientId: 'my-app',
topicAutoCreate: true,
});
Or via environment variable:
KAFKA_TOPIC_AUTO_CREATE=true
This module automatically handles JSON serialization and deserialization of Kafka messages:
When sending messages to Kafka using the send
method:
value
field of each message is automatically serialized using JSON.stringify
createMessage
method of the KafkaService
// Your original object
const user = { id: '123', name: 'John Doe', email: 'john@example.com' };
// When you send it:
await kafkaService.send({
topic: 'user-created',
messages: {
key: user.id,
value: user, // This object is automatically serialized to JSON string
},
});
// What actually gets sent to Kafka:
// key: '123'
// value: '{"id":"123","name":"John Doe","email":"john@example.com"}'
When consuming messages from Kafka:
JSON.parse
// What comes from Kafka:
// key: '123'
// value: '{"id":"123","name":"John Doe","email":"john@example.com"}'
// In your consumer handler:
@KafkaConsumer('user-created', { groupId: 'user-service' })
async handleUserCreated(payload: KafkaConsumerPayload) {
const user = payload.message.value;
// user is already a parsed object: { id: '123', name: 'John Doe', email: 'john@example.com' }
console.log(`User created: ${user.name}`);
}
This automatic serialization/deserialization allows you to work directly with JavaScript objects without having to manually handle JSON conversion in your application code.
This project is licensed under the ISC License.
FAQs
Kafka module for NestJS
We found that @toxicoder/nestjs-kafka demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Four npm packages disguised as cryptographic tools steal developer credentials and send them to attacker-controlled Telegram infrastructure.
Security News
Ruby maintainers from Bundler and rbenv teams are building rv to bring Python uv's speed and unified tooling approach to Ruby development.
Security News
Following last week’s supply chain attack, Nx published findings on the GitHub Actions exploit and moved npm publishing to Trusted Publishers.