
Product
Introducing Webhook Events for Alert Changes
Add real-time Socket webhook events to your workflows to automatically receive software supply chain alert changes in real time.
@kafka-ts/nestjs-consumer
Advanced tools
NestJS's Dynamic Module for Consumer.
npm install --save @kafka-ts/nestjs-consumer
# or
yarn add @kafka-ts/nestjs-consumer
# or
pnpm add @kafka-ts/nestjs-consumer
// main.ts
import { NestFactory } from '@nestjs/core';
import { KafkaConsumer } from '@kafka-ts/nestjs-consumer';
import { AppModule } from 'app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
strategy: new KafkaConsumer({
brokers: ['localhost:9092'],
consumerOptions: {
groupId: 'test-id',
},
}),
},
);
await app.listen(3_000);
}
bootstrap();
// main.ts
import { NestFactory } from '@nestjs/core';
import { KafkaConsumer } from '@kafka-ts/nestjs-consumer';
import { AppModule } from 'app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
KafkaConsumer.createService({
brokers: ['localhost:9092'],
consumerOptions: {
groupId: 'test-id',
},
}),
);
await app.listen(3_000);
}
bootstrap();
// main.ts
import { NestFactory } from '@nestjs/core';
import { KafkaConsumer } from '@kafka-ts/nestjs-consumer';
import { AppModule } from 'app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.connectMicroservice(
KafkaConsumer.createService({
brokers: ['localhost:9092'],
consumerOptions: {
groupId: 'test-id',
},
}),
);
await app.startAllMicroservices();
await app.listen(3_000);
}
bootstrap();
// consumer.controller.ts
import { Controller } from '@nestjs/common';
import {
Ctx,
Payload,
Subscribe,
SubscribeMessage,
KafkaMessageContext,
KafkaBatchMessageContext,
} from '@kafka-ts/nestjs-consumer';
@Controller()
export class ConsumerController {
@Subscribe({
topics: ['topic'],
})
public async handleSubscribe(
@Payload() data: string[],
@Ctx() context: KafkaBatchMessageContext,
): Promise<string> {
console.log('data', data);
console.log('context.batch', context.batch);
return 'Ok!';
}
}
clientId// main.ts
import { NestFactory } from '@nestjs/core';
import { KafkaConsumer } from '@kafka-ts/nestjs-consumer';
import { AppModule } from 'app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.connectMicroservice(
KafkaConsumer.createService([
{
brokers: ['localhost:9092'],
consumerOptions: {
groupId: 'test-id',
},
},
{
clientId: 'test-client',
brokers: ['localhost:9092'],
consumerOptions: {
groupId: 'test-id-2',
},
},
]),
);
await app.startAllMicroservices();
await app.listen(3_000);
}
bootstrap();
// consumer.controller.ts
import { Controller } from '@nestjs/common';
import {
Ctx,
Payload,
Subscribe,
SubscribeMessage,
KafkaMessageContext,
KafkaBatchMessageContext,
} from '@kafka-ts/nestjs-consumer';
@Controller()
export class ConsumerController {
@Subscribe({
topics: ['topic'],
})
public async handleSubscribe(
@Payload() data: string[],
@Ctx() context: KafkaBatchMessageContext,
): Promise<string> {
console.log('data', data);
console.log('context.batch', context.batch);
return 'Ok!';
}
@Subscribe({
clientId: 'test-client',
topics: ['topic_2'],
})
public async handleSubscribeTestClient(
@Payload() data: string[],
@Ctx() context: KafkaBatchMessageContext,
): Promise<string> {
console.log('data', data);
console.log('context.batch', context.batch);
return 'Ok!';
}
}
If you wanna subscribe eachBatch and eachMessage, you should define two topic in different clientId or else only eachBatch or eachMessage can run.
// main.ts
import detect from 'detect-port';
import { NestFactory } from '@nestjs/core';
import { KafkaConsumer } from '@kafka-ts/nestjs-consumer';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.connectMicroservice(
KafkaConsumer.createService([
{
brokers: ['localhost:9092'],
consumerOptions: {
groupId: 'test-id',
},
},
{
clientId: 'test-client',
brokers: ['localhost:9092'],
consumerOptions: {
groupId: 'test-id-2',
},
},
]),
);
const port = await detect(3_000);
await app.startAllMicroservices();
await app.listen(port);
console.log(`Run on ${port}`);
}
bootstrap();
// consumer.controller.ts
import { Controller } from '@nestjs/common';
import {
Ctx,
Payload,
Subscribe,
SubscribeMessage,
KafkaMessageContext,
KafkaBatchMessageContext,
} from '@kafka-ts/nestjs-consumer';
@Controller()
export class ConsumerController {
@Subscribe({
topics: ['topic'],
})
public async handleSubscribe(
@Payload() data: string[],
@Ctx() context: KafkaBatchMessageContext,
): Promise<string> {
console.log('data', data);
console.log('context.batch', context.batch);
return 'Ok!';
}
@SubscribeMessage({
clientId: 'test-client',
topics: ['topic_2'],
})
public async handleSubscribeMessage(
@Payload() data: string,
@Ctx() context: KafkaMessageContext,
): Promise<string> {
console.log('data', data);
console.log('context.message', context.message);
return 'Ok!';
}
}
FAQs
NestJS package for consumer
We found that @kafka-ts/nestjs-consumer demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Product
Add real-time Socket webhook events to your workflows to automatically receive software supply chain alert changes in real time.

Security News
ENISA has become a CVE Program Root, giving the EU a central authority for coordinating vulnerability reporting, disclosure, and cross-border response.

Product
Socket now scans OpenVSX extensions, giving teams early detection of risky behaviors, hidden capabilities, and supply chain threats in developer tools.