Socket
Book a DemoInstallSign in
Socket

@imec/digital-twin-kafka-utils

Package Overview
Dependencies
Maintainers
4
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@imec/digital-twin-kafka-utils

## Description Kafka manager is a wrapper around Kafkajs for the Digital Twin project. It's meant to be available publicly from NPM.

npmnpm
Version
0.2.1
Version published
Weekly downloads
1
Maintainers
4
Weekly downloads
 
Created
Source

Kafka Connection Manager

Description

Kafka manager is a wrapper around Kafkajs for the Digital Twin project. It's meant to be available publicly from NPM.

How to use it

Example to create and initialize an instance

import KafkaManager, { ProducerRecord, KafkaMessage } from ".";

interface RoadData{
	name: string,
	age: number
}

interface HobbitData{
	name: string,
	address: string
}

export default class Example {
  kafka!: KafkaManager

	async init() {
    this.kafka = new KafkaManager({ brokers: ['kafka-server:9092']});
    const producerConfig = {} // producer config from kafkajs, optional can be left out
    const kafkaProducer = await this.kafka.createProducer(producerConfig);
    const consumerConfig = {groupId: "hello"};
    const kafkaConsumer = await this.kafka.createConsumer(consumerConfig);
    const topic = 'traffic-loop';

		const record: ProducerRecord = {
			topic,
			messages: [
        {
          headers: {'sequenceId': '2'},
          key: 'remove road',
          value: {name: 'my awesome name', age: 124}
        },
        {
          headers: {'sequenceId': '2'},
          key: 'add hobbit',
          value: {name: 'hobbit', address: 'Esgaroth'}
        }
      ]
	  }
    
    setInterval(() => {
      console.log('publishing');
      this.kafka.publish(kafkaProducer, record);
    }, 5000)

    function eachBatchFn(messages: KafkaMessage<RoadData|HobbitData>[]) {
      messages.forEach(element => {
        switch (element.key) {
        case "add hobbit":
          console.log("Handle the hobbit message");
          break;
        case "remove road":
          console.log("Handle the road message");
          break;
        default:
          console.log("Handle unexpected default");
        }
      });
      console.log("batch messages", messages)
    }
    
    function eachMessageFn(message: KafkaMessage<RoadData|HobbitData>) {
      console.log("messages", message)
	    switch (message.key) {
        case "add hobbit":
			    console.log("Handle the hobbit message");
          break;
        case "remove road":
			    console.log("Handle the road message");
          break;
        default:
          console.log("Handle unexpected default");
        }
		}
    
    // Subscribing to a topic can be done using the two methods below. Note that you cannot have `eachMessage` and `eachBatch` in the same subscription! This will cause kafkajs to only run one of the two.

    // Use for each message subscription
    this.kafka.subscribe<RoadData|HobbitData>(kafkaConsumer, {
      subscribeTopic: {topic},
      eachMessage: eachMessageFn
    });

    // Use batch subscription
    this.kafka.subscribe<RoadData|HobbitData>(kafkaConsumer, {
      subscribeTopic: {topic},
      eachBatch: eachBatchFn,
    });
	}
}

FAQs

Package last updated on 16 Mar 2020

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts