node-red-contrib-kafkajs
Advanced tools
Comparing version 0.0.3 to 0.0.5
@@ -9,3 +9,8 @@ module.exports = function(RED) { | ||
let client = RED.nodes.getNode(config.client); | ||
let client = RED.nodes.getNode(config.client); | ||
if(!client){ | ||
return; | ||
} | ||
const kafka = new Kafka(client.options) | ||
@@ -66,3 +71,3 @@ | ||
node.onError = function (e){ | ||
node.error(e.message); | ||
node.error("Kafka Consumer Error", e.message); | ||
node.status({fill:"red",shape:"ring",text:"Error"}); | ||
@@ -69,0 +74,0 @@ } |
@@ -15,28 +15,29 @@ module.exports = function(RED) { | ||
let client = RED.nodes.getNode(config.client) | ||
let client = RED.nodes.getNode(config.client); | ||
let kafka = new Kafka(client.options) | ||
let kafka = new Kafka(client.options); | ||
if(!client){ | ||
return; | ||
} | ||
var producerOptions = new Object(); | ||
var sendOptions = new Object(); | ||
sendOptions.topic = config.topic; | ||
sendOptions.topic = config.topic || null; | ||
if(config.advancedoptions){ | ||
producerOptions.metadataMaxAge = config.metadatamaxage; | ||
producerOptions.allowAutoTopicCreation = config.allowautotopiccreation; | ||
producerOptions.transactionTimeout = config.transactiontimeout; | ||
sendOptions.partition = config.partition; | ||
sendOptions.key = config.key; | ||
producerOptions.metadataMaxAge = config.metadatamaxage; | ||
producerOptions.allowAutoTopicCreation = config.allowautotopiccreation; | ||
producerOptions.transactionTimeout = config.transactiontimeout; | ||
sendOptions.partition = config.partition || null; | ||
sendOptions.key = config.key || null; | ||
sendOptions.headers = config.headeritems; | ||
sendOptions.acks = acksDict[config.acknowledge]; | ||
sendOptions.timeout = config.responsetimeout; | ||
sendOptions.headers = config.headeritems; | ||
sendOptions.acks = acksDict[config.acknowledge]; | ||
sendOptions.timeout = config.responsetimeout; | ||
} | ||
node.sendOptions = sendOptions; | ||
node.init = async function init(){ | ||
const producer = kafka.producer() | ||
const producer = kafka.producer(); | ||
node.producer = producer; | ||
@@ -83,33 +84,32 @@ | ||
if(node.ready){ | ||
var sendOptions = new Object(); | ||
sendOptions.topic = node.sendOptions.topic || msg.topic; | ||
sendOptions.acks = node.sendOptions.acks; | ||
sendOptions.timeout = node.sendOptions.timeout; | ||
if(msg.payload){ | ||
sendOptions.messages = [] | ||
var message = new Object(); | ||
if(node.sendOptions.key || msg.key){ | ||
message.key = node.sendOptions.key || msg.key; | ||
} | ||
var sendOptions = new Object(); | ||
if(Object.keys(node.sendOptions.headers).length != 0 || msg.headers){ | ||
message.headers = Object.keys(node.sendOptions.headers).length != 0 ? node.sendOptions.headers : msg.headers; | ||
} | ||
sendOptions.topic = node.sendOptions.topic || msg.topic || null; | ||
sendOptions.acks = node.sendOptions.acks || null; | ||
sendOptions.timeout = node.sendOptions.timeout || null; | ||
if(node.sendOptions.partition || msg.partition){ | ||
message.partition = node.sendOptions.partition || msg.partition; | ||
sendOptions.messages = []; | ||
var message = new Object(); | ||
message.key = node.sendOptions.key || msg.key || null; | ||
message.headers = node.sendOptions.headers; | ||
message.headers = Object.keys(message.headers).length === 0 ? msg.headers : message.headers; | ||
message.partition = node.sendOptions.partition || msg.partition || null; | ||
message.value = msg.payload; | ||
sendOptions.messages.push(message); | ||
node.producer.send(sendOptions).catch((e)=>{ | ||
node.error("Kafka Producer Error", e); | ||
node.status({fill:"red",shape:"ring",text:"Error"}); | ||
}) | ||
node.lastMessageTime = new Date().getTime(); | ||
node.status({fill:"blue",shape:"ring",text:"Sending"}); | ||
} | ||
message.value = msg.payload; | ||
sendOptions.messages.push(message); | ||
node.producer.send(sendOptions).catch((e)=>{ | ||
node.error(e.message); | ||
node.status({fill:"red",shape:"ring",text:"Error"}); | ||
}) | ||
node.lastMessageTime = new Date().getTime(); | ||
node.status({fill:"blue",shape:"ring",text:"Sending"}); | ||
} | ||
@@ -116,0 +116,0 @@ }); |
@@ -15,3 +15,3 @@ { | ||
"homepage": "https://github.com/emrebekar/node-red-contrib-kafkajs#readme", | ||
"version":"0.0.3", | ||
"version":"0.0.5", | ||
"license" :"Apache 2.0", | ||
@@ -18,0 +18,0 @@ "keywords": [ |
105928
249