New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

node-red-contrib-kafkajs

Package Overview
Dependencies
Maintainers
1
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

node-red-contrib-kafkajs - npm Package Compare versions

Comparing version 0.0.3 to 0.0.5

9

js/kafka-consumer.js

@@ -9,3 +9,8 @@ module.exports = function(RED) {

let client = RED.nodes.getNode(config.client);
let client = RED.nodes.getNode(config.client);
if(!client){
return;
}
const kafka = new Kafka(client.options)

@@ -66,3 +71,3 @@

node.onError = function (e){
node.error(e.message);
node.error("Kafka Consumer Error", e.message);
node.status({fill:"red",shape:"ring",text:"Error"});

@@ -69,0 +74,0 @@ }

@@ -15,28 +15,29 @@ module.exports = function(RED) {

let client = RED.nodes.getNode(config.client)
let client = RED.nodes.getNode(config.client);
let kafka = new Kafka(client.options)
let kafka = new Kafka(client.options);
if(!client){
return;
}
var producerOptions = new Object();
var sendOptions = new Object();
sendOptions.topic = config.topic;
sendOptions.topic = config.topic || null;
if(config.advancedoptions){
producerOptions.metadataMaxAge = config.metadatamaxage;
producerOptions.allowAutoTopicCreation = config.allowautotopiccreation;
producerOptions.transactionTimeout = config.transactiontimeout;
sendOptions.partition = config.partition;
sendOptions.key = config.key;
producerOptions.metadataMaxAge = config.metadatamaxage;
producerOptions.allowAutoTopicCreation = config.allowautotopiccreation;
producerOptions.transactionTimeout = config.transactiontimeout;
sendOptions.partition = config.partition || null;
sendOptions.key = config.key || null;
sendOptions.headers = config.headeritems;
sendOptions.acks = acksDict[config.acknowledge];
sendOptions.timeout = config.responsetimeout;
sendOptions.headers = config.headeritems;
sendOptions.acks = acksDict[config.acknowledge];
sendOptions.timeout = config.responsetimeout;
}
node.sendOptions = sendOptions;
node.init = async function init(){
const producer = kafka.producer()
const producer = kafka.producer();
node.producer = producer;

@@ -83,33 +84,32 @@

if(node.ready){
var sendOptions = new Object();
sendOptions.topic = node.sendOptions.topic || msg.topic;
sendOptions.acks = node.sendOptions.acks;
sendOptions.timeout = node.sendOptions.timeout;
if(msg.payload){
sendOptions.messages = []
var message = new Object();
if(node.sendOptions.key || msg.key){
message.key = node.sendOptions.key || msg.key;
}
var sendOptions = new Object();
if(Object.keys(node.sendOptions.headers).length != 0 || msg.headers){
message.headers = Object.keys(node.sendOptions.headers).length != 0 ? node.sendOptions.headers : msg.headers;
}
sendOptions.topic = node.sendOptions.topic || msg.topic || null;
sendOptions.acks = node.sendOptions.acks || null;
sendOptions.timeout = node.sendOptions.timeout || null;
if(node.sendOptions.partition || msg.partition){
message.partition = node.sendOptions.partition || msg.partition;
sendOptions.messages = [];
var message = new Object();
message.key = node.sendOptions.key || msg.key || null;
message.headers = node.sendOptions.headers;
message.headers = Object.keys(message.headers).length === 0 ? msg.headers : message.headers;
message.partition = node.sendOptions.partition || msg.partition || null;
message.value = msg.payload;
sendOptions.messages.push(message);
node.producer.send(sendOptions).catch((e)=>{
node.error("Kafka Producer Error", e);
node.status({fill:"red",shape:"ring",text:"Error"});
})
node.lastMessageTime = new Date().getTime();
node.status({fill:"blue",shape:"ring",text:"Sending"});
}
message.value = msg.payload;
sendOptions.messages.push(message);
node.producer.send(sendOptions).catch((e)=>{
node.error(e.message);
node.status({fill:"red",shape:"ring",text:"Error"});
})
node.lastMessageTime = new Date().getTime();
node.status({fill:"blue",shape:"ring",text:"Sending"});
}

@@ -116,0 +116,0 @@ });

@@ -15,3 +15,3 @@ {

"homepage": "https://github.com/emrebekar/node-red-contrib-kafkajs#readme",
"version":"0.0.3",
"version":"0.0.5",
"license" :"Apache 2.0",

@@ -18,0 +18,0 @@ "keywords": [

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc