kafka-observe
Advanced tools
Comparing version 1.1.7-alpha9 to 1.1.7
@@ -69,3 +69,2 @@ const { Consumer, ConsumerGroup, Producer } = require("./IO"); | ||
logAllEvents, | ||
protocol: consumerGroupProtocol, | ||
reconnectErrors: kafkaConfig.reconnectErrors, | ||
@@ -72,0 +71,0 @@ }); |
@@ -6,13 +6,7 @@ const kafka = require("kafka-node"); | ||
const Consumer = kafka.Consumer; | ||
let client; | ||
const Consumer = kafka.ConsumerGroup; | ||
module.exports = class ConsumerClass { | ||
module.exports = class ConsumerGroupClass { | ||
constructor(options) { | ||
this.options = options; | ||
client = new kafka.KafkaClient({ | ||
kafkaHost: this.options.kafkaHost, | ||
}); | ||
this.initObservables(); | ||
@@ -23,5 +17,9 @@ this.addListener(); | ||
initObservables() { | ||
this.options.topicsToFollow.forEach(topic => { | ||
this[`${topic}Subject`] = new Subject(); | ||
}); | ||
if(this.options.topicsToFollow.forEach) { | ||
this.options.topicsToFollow.forEach(topic => { | ||
this[`${topic}Subject`] = new Subject(); | ||
}); | ||
} else { | ||
this[`${this.options.topicsToFollow}Subject`] = new Subject(); | ||
} | ||
} | ||
@@ -33,41 +31,12 @@ | ||
loadTopicMetaData() { | ||
return new Promise((res,rej) => { | ||
client.loadMetadataForTopics(this.options.topicsToFollow, (error, metadata) => { | ||
if(error) { | ||
logging.error(error); | ||
return; | ||
} | ||
const result = []; | ||
const data = metadata[1].metadata; | ||
for (const topic in data) { | ||
for (const topicPartition in data[topic]) { | ||
result.push({ | ||
topic: topic, | ||
partition: Number(topicPartition), | ||
}) | ||
} | ||
} | ||
res(result); | ||
}); | ||
}); | ||
} | ||
async addListener() { | ||
const listenTopics = await this.loadTopicMetaData() | ||
console.log(JSON.stringify(listenTopics)); | ||
let consumer = new Consumer(client, | ||
listenTopics, | ||
addListener() { | ||
const consumer = new Consumer( | ||
{ | ||
kafkaHost: this.options.kafkaHost, | ||
groupId: this.options.consumerId, | ||
} | ||
protocol: ["range"], | ||
}, | ||
this.options.topicsToFollow, | ||
); | ||
consumer.on("message", (message) => { | ||
@@ -104,8 +73,4 @@ if (message && message.value && message.topic) { | ||
consumer.close(true, () => { | ||
client = new kafka.KafkaClient({ | ||
kafkaHost: this.options.kafkaHost, | ||
}); | ||
this.addListener(); | ||
}); | ||
} | ||
@@ -112,0 +77,0 @@ |
@@ -34,3 +34,3 @@ const kafka = require("kafka-node"); | ||
groupId: this.options.consumerGroupId, | ||
protocol: this.options.protocol, | ||
protocol: ["roundrobin"], | ||
@@ -37,0 +37,0 @@ }, |
{ | ||
"name": "kafka-observe", | ||
"version": "1.1.7-alpha9", | ||
"version": "1.1.7", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
2
17153
376