A progressive Node.js framework for building efficient and scalable server-side applications.
Description
Kafka module for Nest.
Installation
$ npm i --save @kimtuan1102/nestjs-kafka @kafkajs/confluent-schema-registry @nestjs/microservices kafkajs rxjs
Synchronous Module Initialization
Register the KafkaModule
synchronous with the register()
method:
@Module({
imports: [
KafkaModule.register([
{
name: 'HERO_SERVICE',
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
},
]),
]
...
})
Asynchronous Module Initialization
Register the KafkaModule
asynchronous with the registerAsync()
method:
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({
imports: [
ConfigModule.forRoot(),
KafkaModule.registerAsync(['HERO_SERVICE'], {
useFactory: async (configService: ConfigService) => {
const broker = this.configService.get('broker');
return [
{
name: 'HERO_SERVICE',
options: {
clientId: 'hero',
brokers: [broker],
},
consumer: {
groupId: 'hero-consumer'
}
}
}
];
},
inject: [ConfigService]
})
]
...
})
Asynchronous Module Initialization using config service
import { KafkaModuleOption, KafkaOptionsFactory } from '../kafka';
import { KAFKA_INTEGRATION_SERVICE } from '../common/constants';
export class KafkaConfigService implements KafkaOptionsFactory {
creatKafkaModuleOptions(): KafkaModuleOption[] {
return [
{
name: 'HERO_SERVICE',
options: {
client: {
clientId: clientId,
brokers: brokers,
},
consumer: {
groupId: clientId,
},
},
},
];
}
}
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { KafkaModule } from '@kimtuan1102/nestjs-kafka';
@Module({
imports: [
KafkaModule.registerAsync(['HERO_SERVICE'], {
imports: [ConfigModule],
useClass: KafkaConfigService,
},
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
Full settings can be found:
Subscribing
Subscribing to a topic to accept messages.
export class Consumer {
constructor(
@Inject('HERO_SERVICE') private client: KafkaService
) {}
onModuleInit(): void {
this.client.subscribeToResponseOf('hero.kill.dragon', this)
}
@SubscribeTo('hero.kill.dragon')
async getWorld(@Payload() data: KafkaMessage): Promise<void> {
...
}
}
Producing
Send messages back to kafka.
const TOPIC_NAME = 'hero.kill.dragon';
export class Producer {
constructor(
@Inject('HERO_SERVICE') private client: KafkaService
) {}
async post(message: string = 'Hello world'): Promise<RecordMetadata[]> {
const result = await this.client.send({
topic: TOPIC_NAME,
messages: [
{
key: '1',
value: message
}
]
});
return result;
}
}
Support
Nest is an MIT-licensed open source project. It can grow thanks to the sponsors and support by the amazing backers. If you'd like to join them, please read more here.
Stay in touch
License
Nest is MIT licensed.