Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@scaliolabs/nestjs-kafka

Package Overview
Dependencies
Maintainers
0
Versions
4
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@scaliolabs/nestjs-kafka

A Kafka wrapper around `kafkajs` that adds Kafka support to NestJs following the `Orchestrator and Explorer` pattern, with minimum boilerplate code needed.

  • 1.3.0
  • latest
  • npm
  • Socket score

Version published
Maintainers
0
Created
Source

@scaliolabs/nestjs-kafka

Overview

The @scaliolabs/nestjs-kafka is a Kafka wrapper around kafkajs that adds Kafka support to NestJs following the Orchestrator and Explorer pattern. It provides functionality for integrating with Kafka, a distributed streaming platform, with minimum boilerplate code needed.

Features

  • @SubscribeTo: Process a Kafka topic with your function.
  • ProducerService: Sends messages to Kafka topics

All kafkajs configuration flags are still exposed, allowing you to customize your Kafka initialization without the package getting in your way

Installation

To begin using it, we first install the required dependencies.

yarn add @scaliolabs/nestjs-kafka kafkajs

Configuring the Module

// app.module.ts

import { KafkaModule } from '@atomik-core'

// Basic Config
config: IIKafkaConfig = {
	brokers: ['127.0.0.1:9092'],
	clientId: 'vitamin-seats',
	groupId: 'my-group-id',
}

// Customized Config
config: IIKafkaConfig = {
	brokers: ['127.0.0.1:9092'],
	clientId: 'vitamin-seats',
	groupId: 'my-group-id',]
	consumerConfig: {
		allowAutoTopicCreation: true,
	},
	producerConfig: {
		allowAutoTopicCreation: true,
	},
	kafkaConfig: {
		requestTimeout: 5000
	}
}

@Module({
	imports: [
		KafkaModule.register(config),
	],
})
export class AppModule {}

Additional options can be found in the KafkaJS documentation.

Setup Typings

We suggest, as a best practice, to keep the messages you pass to Kafka and your topics strongly typed, so we encourage you to setup your typings and integrating with our package namespace.

/**
 * Topics
 */

export enum TopicsEnum {
	MY_TOPIC = 'my-topic',
}

/**
 * Message Types
 */

export type IMyMessage = { name: string }

Producing Messages

To send messages to Kafka, you need to inject the KafkaProducerService into your service constructor, create a KafkaPayload and send it.

The sendMessage functions accepts one or multiple messages.

@Injectable()
export class MyService {
	constructor(private readonly producer: KafkaProducerService) {}

	someMethod() {
		const message = KafkaPayload.create<IMyMessage>({
			body: { name: 'Franz' },
		})

		this.producer.sendMessage(TopicsEnum.MY_TOPIC, message)
	}
}

When creating a message, you can provide some advanced options to tailor your usage to specific needs. All of those information will also be available when consuming that same message.

PropertyTypeDescription
bodyTThe message payload of generic type T.
messageType (optional)stringA string representing the type of the message. This is useful if you want to have different types of messages in the same topic.
key (optional)stringA string representing the key of the message. This is used to guarantee that all messages with the same key will be sent to the same partition.
partition (optional)numberA number indicating the partition number where the message will be sent.
headers (optional)Record<string, any>An object containing key-value pairs representing the headers of the message. This is used to add metadata to the message.

Consuming Messages

To consume messages, just must annotate your class with @KafkaListener() and your method with the decorator @ListenTopic(topicName, options). Your function will then receive two parameters: A IKafkaConsumedMessage and a IKafkaContext, that can be used to further interact with your message processing.

@Injectable()
@KafkaListener()
export class MyService {
	constructor(private readonly producer: KafkaProducerService) {}

	@ListenTopic('my-topic')
	async processMyTopic({ body }: IKafkaConsumedMessage<IMyMessage>, context: IKafkaContext) {
		console.log(body.name)
	}
}

Decorator Options

The IKafkaDecoratorOptions type defines the configuration options for a Kafka decorator. This interface is used to specify various settings related to Kafka topic consumption.

PropertyTypeDescription
fromBeginningbooleanIf set to true, the consumer will start reading messages from the beginning of the topic. Defaults to false.
useInstanceConsumerGroupbooleanIf set to true, the consumer will use an instance-specific consumer group. Defaults to false.
maxRetriesnumberQuantity of retries this consumer should do. Set to 0 to disable retries. Defaults to 0.
retryFactornumberFactor the retry timeout should be multiplied by on each retry. Defaults to 2.
retryMinTimeoutnumberMinimum time in milliseconds to wait before retrying. Defaults to 1000ms.
retryMaxTimeoutnumberMaximum time in milliseconds to wait before retrying. Defaults to 5000ms.
retryRandomizebooleanRandomize the retry timeout with a number between 1 and 2. Defaults to true.

The decorator also accept all properties from original kafkajs message consumption implementation.

KafkaContext Interface

The IKafkaContext interface defines the structure for Kafka context objects. It includes the following properties and methods:

PropertyTypeDescription
topicstringThe name of the Kafka topic.
partitionnumberThe partition number within the Kafka topic.
heartbeat() => voidA method to send a heartbeat signal. It's useful for long running processors to signal to Kafka that you're still processing, otherwise Kafka might auto remove your consumer from the pool.
pause() => () => voidA method to pause the Kafka consumer. This method returns another function that can be called to resume the consumer.
retryAttemptnumberNumber indicating the retry count of this processing. First time executing, this value is 0

DLQ Support

Dead Letter Queue (DLQ) support allows you to handle messages that cannot be processed successfully. When a message fails to be processed after a certain number of retries, it can be sent to a DLQ for further inspection and handling. This ensures that problematic messages do not block the processing of other messages and provides a mechanism for dealing with errors in a controlled manner.

To enable DLQ support, you need to configure the KafkaModule with DLQ settings.

Example configuration:

config: IIKafkaConfig = {
	dlqTopic?: (originalTopic: string, messageType?: string) => 'my-dlq-topic',
	//..other config options go here
}

In this example, messages that fail to process will be sent to the my-dlq-topic for further analysis and handling.

Since dlqTopic is a function, you can add conditional logic to decide which DLQ you want to send based on your topic name or message type.

Future Ideas

License

This project is licensed under the MIT License. See the LICENSE file for details.

Support

If you encounter any issues or have questions, feel free to open an issue on our GitHub repository.

Acknowledgements

We would like to thank the contributors of KafkaJS for their excellent work on the Kafka client for Node.js.

Contact

For more information, please contact us at support@scaliolabs.com.

Keywords

FAQs

Package last updated on 13 Nov 2024

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc