Security News
PyPI Now Supports iOS and Android Wheels for Mobile Python Development
PyPI now supports iOS and Android wheels, making it easier for Python developers to distribute mobile packages.
@scaliolabs/nestjs-kafka
Advanced tools
A Kafka wrapper around `kafkajs` that adds Kafka support to NestJs following the `Orchestrator and Explorer` pattern, with minimum boilerplate code needed.
The @scaliolabs/nestjs-kafka
is a Kafka wrapper around kafkajs
that adds Kafka support to NestJs following the Orchestrator and Explorer
pattern. It provides functionality for integrating with Kafka, a distributed streaming platform, with minimum boilerplate code needed.
All
kafkajs
configuration flags are still exposed, allowing you to customize your Kafka initialization without the package getting in your way
To begin using it, we first install the required dependencies.
yarn add @scaliolabs/nestjs-kafka kafkajs
// app.module.ts
import { KafkaModule } from '@atomik-core'
// Basic Config
config: IIKafkaConfig = {
brokers: ['127.0.0.1:9092'],
clientId: 'vitamin-seats',
groupId: 'my-group-id',
}
// Customized Config
config: IIKafkaConfig = {
brokers: ['127.0.0.1:9092'],
clientId: 'vitamin-seats',
groupId: 'my-group-id',]
consumerConfig: {
allowAutoTopicCreation: true,
},
producerConfig: {
allowAutoTopicCreation: true,
},
kafkaConfig: {
requestTimeout: 5000
}
}
@Module({
imports: [
KafkaModule.register(config),
],
})
export class AppModule {}
Additional options can be found in the KafkaJS documentation.
We suggest, as a best practice, to keep the messages you pass to Kafka and your topics strongly typed, so we encourage you to setup your typings and integrating with our package namespace.
/**
* Topics
*/
export enum TopicsEnum {
MY_TOPIC = 'my-topic',
}
/**
* Message Types
*/
export type IMyMessage = { name: string }
To send messages to Kafka, you need to inject the KafkaProducerService
into your service constructor, create a KafkaPayload
and send it.
The sendMessage
functions accepts one or multiple messages.
@Injectable()
export class MyService {
constructor(private readonly producer: KafkaProducerService) {}
someMethod() {
const message = KafkaPayload.create<IMyMessage>({
body: { name: 'Franz' },
})
this.producer.sendMessage(TopicsEnum.MY_TOPIC, message)
}
}
When creating a message, you can provide some advanced options to tailor your usage to specific needs. All of those information will also be available when consuming that same message.
Property | Type | Description |
---|---|---|
body | T | The message payload of generic type T . |
messageType (optional) | string | A string representing the type of the message. This is useful if you want to have different types of messages in the same topic. |
key (optional) | string | A string representing the key of the message. This is used to guarantee that all messages with the same key will be sent to the same partition. |
partition (optional) | number | A number indicating the partition number where the message will be sent. |
headers (optional) | Record<string, any> | An object containing key-value pairs representing the headers of the message. This is used to add metadata to the message. |
To consume messages, just must annotate your class with @KafkaListener()
and your method with the decorator @ListenTopic(topicName, options)
.
Your function will then receive two parameters: A IKafkaConsumedMessage
and a IKafkaContext
, that can be used to further interact with your message processing.
@Injectable()
@KafkaListener()
export class MyService {
constructor(private readonly producer: KafkaProducerService) {}
@ListenTopic('my-topic')
async processMyTopic({ body }: IKafkaConsumedMessage<IMyMessage>, context: IKafkaContext) {
console.log(body.name)
}
}
The IKafkaDecoratorOptions
type defines the configuration options for a Kafka decorator. This interface is used to specify various settings related to Kafka topic consumption.
Property | Type | Description |
---|---|---|
fromBeginning | boolean | If set to true , the consumer will start reading messages from the beginning of the topic. Defaults to false . |
useInstanceConsumerGroup | boolean | If set to true , the consumer will use an instance-specific consumer group. Defaults to false . |
maxRetries | number | Quantity of retries this consumer should do. Set to 0 to disable retries. Defaults to 0 . |
retryFactor | number | Factor the retry timeout should be multiplied by on each retry. Defaults to 2 . |
retryMinTimeout | number | Minimum time in milliseconds to wait before retrying. Defaults to 1000ms . |
retryMaxTimeout | number | Maximum time in milliseconds to wait before retrying. Defaults to 5000ms . |
retryRandomize | boolean | Randomize the retry timeout with a number between 1 and 2. Defaults to true . |
The decorator also accept all properties from original kafkajs
message consumption implementation.
The IKafkaContext
interface defines the structure for Kafka context objects. It includes the following properties and methods:
Property | Type | Description |
---|---|---|
topic | string | The name of the Kafka topic. |
partition | number | The partition number within the Kafka topic. |
heartbeat | () => void | A method to send a heartbeat signal. It's useful for long running processors to signal to Kafka that you're still processing, otherwise Kafka might auto remove your consumer from the pool. |
pause | () => () => void | A method to pause the Kafka consumer. This method returns another function that can be called to resume the consumer. |
retryAttempt | number | Number indicating the retry count of this processing. First time executing, this value is 0 |
Dead Letter Queue (DLQ) support allows you to handle messages that cannot be processed successfully. When a message fails to be processed after a certain number of retries, it can be sent to a DLQ for further inspection and handling. This ensures that problematic messages do not block the processing of other messages and provides a mechanism for dealing with errors in a controlled manner.
To enable DLQ support, you need to configure the KafkaModule
with DLQ settings.
Example configuration:
config: IIKafkaConfig = {
dlqTopic?: (originalTopic: string, messageType?: string) => 'my-dlq-topic',
//..other config options go here
}
In this example, messages that fail to process will be sent to the my-dlq-topic
for further analysis and handling.
Since dlqTopic
is a function, you can add conditional logic to decide which DLQ you want to send based on your topic name
or message type
.
This project is licensed under the MIT License. See the LICENSE file for details.
If you encounter any issues or have questions, feel free to open an issue on our GitHub repository.
We would like to thank the contributors of KafkaJS for their excellent work on the Kafka client for Node.js.
For more information, please contact us at support@scaliolabs.com.
FAQs
A Kafka wrapper around `kafkajs` that adds Kafka support to NestJs following the `Orchestrator and Explorer` pattern, with minimum boilerplate code needed.
The npm package @scaliolabs/nestjs-kafka receives a total of 4 weekly downloads. As such, @scaliolabs/nestjs-kafka popularity was classified as not popular.
We found that @scaliolabs/nestjs-kafka demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 0 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
PyPI now supports iOS and Android wheels, making it easier for Python developers to distribute mobile packages.
Security News
Create React App is officially deprecated due to React 19 issues and lack of maintenance—developers should switch to Vite or other modern alternatives.
Security News
Oracle seeks to dismiss fraud claims in the JavaScript trademark dispute, delaying the case and avoiding questions about its right to the name.