Kafka Connection Manager
Description
Kafka manager is a wrapper around Kafkajs for the Digital Twin project.
It's meant to be available publicly from NPM.
How to use it
Example to create and initialize an instance
import KafkaManager, { ProducerRecord, KafkaMessage } from ".";
interface RoadData{
name: string,
age: number
}
interface HobbitData{
name: string,
address: string
}
export default class Example {
kafka!: KafkaManager
async init() {
this.kafka = new KafkaManager({ brokers: ['kafka-server:9092']});
const producerConfig = {}
const kafkaProducer = await this.kafka.createProducer(producerConfig);
const consumerConfig = {groupId: "hello"};
const kafkaConsumer = await this.kafka.createConsumer(consumerConfig);
const topic = 'traffic-loop';
const record: ProducerRecord = {
topic,
messages: [
{
headers: {'sequenceId': '2'},
key: 'remove road',
value: {name: 'my awesome name', age: 124}
},
{
headers: {'sequenceId': '2'},
key: 'add hobbit',
value: {name: 'hobbit', address: 'Esgaroth'}
}
]
}
setInterval(() => {
console.log('publishing');
this.kafka.publish(kafkaProducer, record);
}, 5000)
function eachBatchFn(messages: KafkaMessage<RoadData|HobbitData>[]) {
messages.forEach(element => {
switch (element.key) {
case "add hobbit":
console.log("Handle the hobbit message");
break;
case "remove road":
console.log("Handle the road message");
break;
default:
console.log("Handle unexpected default");
}
});
console.log("batch messages", messages)
}
function eachMessageFn(message: KafkaMessage<RoadData|HobbitData>) {
console.log("messages", message)
switch (message.key) {
case "add hobbit":
console.log("Handle the hobbit message");
break;
case "remove road":
console.log("Handle the road message");
break;
default:
console.log("Handle unexpected default");
}
}
this.kafka.subscribe<RoadData|HobbitData>(kafkaConsumer, {
subscribeTopic: {topic},
eachMessage: eachMessageFn
});
this.kafka.subscribe<RoadData|HobbitData>(kafkaConsumer, {
subscribeTopic: {topic},
eachBatch: eachBatchFn,
});
}
}