New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafka-observe

Package Overview
Dependencies
Maintainers
1
Versions
130
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-observe - npm Package Compare versions

Comparing version 1.1.7-alpha9 to 1.1.7

1

index.js

@@ -69,3 +69,2 @@ const { Consumer, ConsumerGroup, Producer } = require("./IO");

logAllEvents,
protocol: consumerGroupProtocol,
reconnectErrors: kafkaConfig.reconnectErrors,

@@ -72,0 +71,0 @@ });

65

IO/consumer.js

@@ -6,13 +6,7 @@ const kafka = require("kafka-node");

const Consumer = kafka.Consumer;
let client;
const Consumer = kafka.ConsumerGroup;
module.exports = class ConsumerClass {
module.exports = class ConsumerGroupClass {
constructor(options) {
this.options = options;
client = new kafka.KafkaClient({
kafkaHost: this.options.kafkaHost,
});
this.initObservables();

@@ -23,5 +17,9 @@ this.addListener();

initObservables() {
this.options.topicsToFollow.forEach(topic => {
this[`${topic}Subject`] = new Subject();
});
if(this.options.topicsToFollow.forEach) {
this.options.topicsToFollow.forEach(topic => {
this[`${topic}Subject`] = new Subject();
});
} else {
this[`${this.options.topicsToFollow}Subject`] = new Subject();
}
}

@@ -33,41 +31,12 @@

loadTopicMetaData() {
return new Promise((res,rej) => {
client.loadMetadataForTopics(this.options.topicsToFollow, (error, metadata) => {
if(error) {
logging.error(error);
return;
}
const result = [];
const data = metadata[1].metadata;
for (const topic in data) {
for (const topicPartition in data[topic]) {
result.push({
topic: topic,
partition: Number(topicPartition),
})
}
}
res(result);
});
});
}
async addListener() {
const listenTopics = await this.loadTopicMetaData()
console.log(JSON.stringify(listenTopics));
let consumer = new Consumer(client,
listenTopics,
addListener() {
const consumer = new Consumer(
{
kafkaHost: this.options.kafkaHost,
groupId: this.options.consumerId,
}
protocol: ["range"],
},
this.options.topicsToFollow,
);
consumer.on("message", (message) => {

@@ -104,8 +73,4 @@ if (message && message.value && message.topic) {

consumer.close(true, () => {
client = new kafka.KafkaClient({
kafkaHost: this.options.kafkaHost,
});
this.addListener();
});
}

@@ -112,0 +77,0 @@

@@ -34,3 +34,3 @@ const kafka = require("kafka-node");

groupId: this.options.consumerGroupId,
protocol: this.options.protocol,
protocol: ["roundrobin"],

@@ -37,0 +37,0 @@ },

{
"name": "kafka-observe",
"version": "1.1.7-alpha9",
"version": "1.1.7",
"description": "",

@@ -5,0 +5,0 @@ "main": "index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc