node-red-contrib-kafka-client
Advanced tools
Comparing version 0.0.5 to 0.0.6
@@ -29,2 +29,3 @@ module.exports = function(RED) { | ||
node.error(err); | ||
init(); | ||
} | ||
@@ -31,0 +32,0 @@ |
@@ -8,35 +8,38 @@ module.exports = function(RED) { | ||
node.ready = false; | ||
let broker = RED.nodes.getNode(config.broker); | ||
let kafkaClient = new kafka.KafkaClient(broker.getOptions());; | ||
let producerOptions = new Object(); | ||
producerOptions.requireAcks = config.requireAcks; | ||
producerOptions.ackTimeoutMs = config.ackTimeoutMs; | ||
node.producer = new kafka.HighLevelProducer(kafkaClient, producerOptions); | ||
var init = function(){ | ||
let broker = RED.nodes.getNode(config.broker); | ||
node.onError = function(err){ | ||
node.ready = false; | ||
node.status({fill:"red",shape:"ring",text:"Error"}); | ||
node.error(err); | ||
let kafkaClient = new kafka.KafkaClient(broker.getOptions());; | ||
let producerOptions = new Object(); | ||
producerOptions.requireAcks = config.requireAcks; | ||
producerOptions.ackTimeoutMs = config.ackTimeoutMs; | ||
node.producer = new kafka.HighLevelProducer(kafkaClient, producerOptions); | ||
node.onError = function(err){ | ||
node.status({fill:"red",shape:"ring",text:"Error"}); | ||
node.error(err); | ||
} | ||
node.onReady = function(){ | ||
node.ready = true; | ||
node.status({fill:"green",shape:"ring",text:"Ready"}); | ||
} | ||
node.producer.on('ready', node.onReady); | ||
node.producer.on('error', node.onError); | ||
} | ||
node.onReady = function(){ | ||
node.ready = true; | ||
node.status({fill:"green",shape:"ring",text:"Ready"}); | ||
} | ||
node.producer.on('ready', node.onReady); | ||
node.producer.on('error', node.onError); | ||
let sendOptions = new Object(); | ||
sendOptions.topic = config.topic; | ||
sendOptions.attributes = config.attributes; | ||
init(); | ||
node.on('input', function(msg) { | ||
if(node.ready){ | ||
sendOptions.messages =[msg.payload]; | ||
var sendOptions = new Object(); | ||
sendOptions.topic = config.topic; | ||
sendOptions.attributes = config.attributes; | ||
sendOptions.messages = [msg.payload]; | ||
node.producer.send([sendOptions],function (err) { | ||
@@ -43,0 +46,0 @@ if(!err){ |
{ | ||
"name": "node-red-contrib-kafka-client", | ||
"version": "0.0.5", | ||
"version": "0.0.6", | ||
"description": "Node-RED Kafka Consumer and Producer", | ||
@@ -5,0 +5,0 @@ "node-red": { |
Sorry, the diff of this file is not supported yet
88364
122