Socket
Socket
Sign inDemoInstall

@buka/nestjs-kafka

Package Overview
Dependencies
209
Maintainers
1
Versions
4
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

    @buka/nestjs-kafka

NestJS module supporting multiple connections implemented using KafkaJS


Version published
Maintainers
1
Install size
786 kB
Created

Changelog

Source

1.1.2 (2024-04-20)

Bug Fixes

  • deprecate package (e1c7629)

Readme

Source

@buka/nestjs-kafka

version downloads dependencies license Codecov

Warning!!! This packages had be deprecated. Nestjs microservice has fully implemented this function. There is no reason to maintain this package.

This is a nestJS module implemented using KafkaJS. That support multiple connections and fits the coding style of nestjs.

Usage

Import KafkaModule.forRoot to AppModule:

// app.module.js
import { Module } from "@nestjs/common";
import { KafkaModule } from "@buka/nestjs-kafka";

@Module({
  imports: [
    KafkaModule.forRoot({
      name: "my-kafka",
      groupId: "my-group-id",
      clientId: "my-client-id",
      brokers: ["my_kafka_host:9092"],
    }),
  ],
})
export class AppModule {}

KafkaConsumer

Create a provider named AppConsumer that consume messages:

// app.consumer.js
@Injectable()
@KafkaConsumer()
export class AppConsumer {
  @KafkaConsume("my-topic")
  async finishTask(@KafkaMessage() message: string): Promise<void> {
    // do something
    console.log(message);
  }

  @KafkaConsume("other-topic", { json: true })
  async finishTask(
    @KafkaMessage() message: Record<string, any>
  ): Promise<void> {
    // do something
    console.log(message);
  }
}

AppConsumer and AppService can be merged into one provider, but writing them separately will make the code clearer.

Then, append AppConsumer to AppModule:

import { Module } from "@nestjs/common";
import { AppConsumer } from "./app.consumer";

@Module({
  imports: [
    /* ... */
  ],
  providers: [AppConsumer],
})
export class AppModule {}

KafkaProducer

KafkaProducer will connect on module init and disconnect on module destroy. To use this, import KafkaModule.forProducer(options) to AppModule:

// app.module.js
import { Module } from "@nestjs/common";
import { KafkaModule, Partitioners } from "@buka/nestjs-kafka";
import AppService from "./app.service";

@Module({
  imports: [
    KafkaModule.forRoot({
      name: "my-kafka",
      groupId: "my-group-id",
      clientId: "my-client-id",
      brokers: ["my_kafka_host:9092"],
    }),
    KafkaModule.forProducer({
      name: "my-kafka",
      createPartitioner: Partitioners.LegacyPartitioner,
    }),
  ],
  provider: [AppService],
})
export class AppModule {}

The options of .forProducer is exactly the same as the options of kafka.producer in KafkaJS

Inject KafkaProducer to your AppService:

// app.service.js
@Injectable()
export class AppService {
  constructor(
    @InjectKafkaProducer('my-kafka')
    private readonly producer: KafkaProducer
  ) {}

  async sendMessage() {
    this.producer.send({
      topic: 'kafka-topic'
      messages: [{ value: 'Hello Kafka' }]
    })
  }
}

The .send function of KafkaProducer is exactly the same as the .send function of KafkaJS

KafkaService

Using the KafkaService, you can create consumer and producer like plain KafkaJS.

// app.service.js
import { OnModuleDestroy, OnModuleInit } from "@nestjs/common";
import { Producer, ProducerRecord, RecordMetadata } from "kafkajs";
import { KafkaService } from "@buka/nestjs-kafka";

@Injectable()
export class AppService implements OnModuleInit, OnModuleDestroy {
  producer!: Producer;
  consumer!: Consumer;

  constructor(private readonly kafka: KafkaService) {}

  async onModuleInit(): Promise<void> {
    this.producer = this.kafka.producer();
    await this.producer.connect();

    this.consumer = this.kafka.consumer({
      groupId: "my-group-id",
    });

    this.consumer.subscribe({ topic: "kafka-topic" });
    this.consumer.run({
      eachMessage: async (context) => {
        // do somethings
      },
    });
  }

  async onModuleDestroy(): Promise<void> {
    await this.producer.disconnect();
    await this.consumer.disconnect();
  }
}

Q&A

KafkaConsumer not working with CreateRequestContext of mikro-orm

If you don't pay attention to the order of CreateRequestContext decorators, you may have problems with any of other method decorators, not only @buka/nestjs-kafka.

import { Injectable } from "@nestjs/common";
import { KafkaConsumer, KafkaConsume, KafkaMessage } from "@buka/nestjs-kafka";
import { CreateRequestContext } from "@mikro-orm/mysql";

// app.consumer.js
@Injectable()
@KafkaConsumer()
export class AppConsumer {
  @CreateRequestContext()
  // !! KafkaConsume decorator will not work !!
  @KafkaConsume("my-topic")
  async finishTask(@KafkaMessage() message: string): Promise<void> {
    console.log(message);
  }
}

There are two solutions:

  1. [recommend] written as two functions:

    @Injectable()
    @KafkaConsumer()
    export class AppConsumer {
      @KafkaConsume("my-topic")
      async consumeMessage(@KafkaMessage() message: string): Promise<void> {
        // ... filter and format message
        this.finishTask(JSON.parse(message))
      }
    
      @CreateRequestContext()
      async finishTask(task: Task): Promise<void> {
        // do something
        console.log(task);
      }
    
  2. Pay attention to the order of CreateRequestContext:

    @Injectable()
    @KafkaConsumer()
    export class AppConsumer {
      @KafkaConsume("my-topic")
      // use CreateRequestContext as the last decorator
      @CreateRequestContext()
      async finishTask(@KafkaMessage() message: string): Promise<void> {
        // do something
        console.log(message);
      }
    }
    

Keywords

FAQs

Last updated on 20 Apr 2024

Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc