Socket
Socket
Sign inDemoInstall

node-red-contrib-kafka-manager

Package Overview
Dependencies
Maintainers
1
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

node-red-contrib-kafka-manager - npm Package Compare versions

Comparing version 0.2.7 to 0.2.8

132

kafkaManager/kafkaAdmin.js

@@ -38,60 +38,68 @@ const ts=(new Date().toString()).split(' ');

switch (msg.topic) {
case 'createTopics':
case 'deleteTopics':
data.forEach((c,i,a)=>{
let t=msg.payload.find((cp)=>cp.topic==c.topic);
debug({label:"msgProcess multi response",topic:c,data:t});
if(c.hasOwnProperty('error')) {
debug({label:"createTopics multi response",data:{topic:msg.topic,error:c.error,payload:[t]}});
node.send([null,{topic:msg.topic,error:c.error,payload:[t]}]);
return;
}
debug({label:"msgProcess multi response ok",data:{topic:msg.topic,payload:[c]}});
node.send({topic:msg.topic,payload:[t]});
});
default:
msg.payload=data;
node.send(msg);
case 'createAcls':
case 'createTopics':
case 'deleteAcls':
case 'deleteAcls':
case 'deleteTopics':
case 'electPreferredLeaders':
data.forEach((c,i,a)=>{
let t=msg.payload.find((cp)=>cp.topic==c.topic);
debug({label:"msgProcess multi response",topic:c,data:t});
if(c.hasOwnProperty('error')) {
debug({label:"msgProcess multi response",data:{topic:msg.topic,error:c.error,payload:[t]}});
node.send([null,{topic:msg.topic,error:c.error,payload:[t]}]);
return;
}
debug({label:"msgProcess multi response ok",data:{topic:msg.topic,payload:[c]}});
node.send({topic:msg.topic,payload:[t]});
});
break;
default:
msg.payload=data;
node.send(msg);
}
}
const processInputNoArg=[
"describeCluster","describeDelegationToken","describeReplicaLogDirs","listConsumerGroups","listGroups","listTopics"
];
const processInputPayloadArg=[
"alterConfigs","alterReplicaLogDirs","createAcls","","createDelegationToken",
"createPartitions","createTopics","deleteAcls","deleteConsumerGroups","deleteRecords",
"deleteTopics","describeAcls","describeConsumerGroups",
"describeGroups","describeLogDirs","describeTopics","electPreferredLeaders",
"expireDelegationToken","incrementalAlterConfigs","listConsumerGroupOffsets",
"renewDelegationToken"
];
function processInput(node,msg){
debug({label:"processInput",msg});
try{
if(processInputNoArg.includes(msg.topic)) {
debug({label:"processInput processInputNoArg",msg});
node.connection[msg.topic]((err,data)=>msgProcess(node,msg,err,data));
return;
}
if(processInputPayloadArg.includes(msg.topic)) {
debug({label:"processInput processInputPayloadArg",msg});
node.connection[msg.topic](msg.payload,(err,data)=>msgProcess(node,msg,err,data));
return;
}
switch(msg.topic) {
case'listGroups':
node.connection.listGroups((err,data)=>msgProcess(node,msg,err,data));
return;
case'describeGroups':
node.connection.describeGroups(msg.payload,(err,data)=>msgProcess(node,msg,err,data));
return;
case'listTopics':
node.connection.listTopics((err,data)=>msgProcess(node,msg,err,data));
return;
case'createTopics':
// msg.payload = [{topic: 'topic1',partitions: 1,replicationFactor: 2}];
node.connection.createTopics(msg.payload,(err,data)=>msgProcess(node,msg,err,data));
return;
case'deleteTopics':
// msg.payload = ['topic1'];
node.connection.deleteTopics(msg.payload,(err,data)=>msgProcess(node,msg,err,data));
return;
case'describeConfigs':
// msg.payload={type:'topic',name:'a-topic'}
const resource = {
resourceType: node.connection.RESOURCE_TYPES[msg.payload.type||'topic'], // 'broker' or 'topic'
resourceName: msg.payload.name,
configNames: [] // specific config names, or empty array to return all,
}
const payload = {
resources: [resource],
includeSynonyms: false // requires kafka 2.0+
};
node.connection.describeConfigs(payload, (err,data)=>msgProcess(node,msg,err,data));
return;
case'refreshMetadata':
node.connection.refreshMetadata();
return;
default: throw Error("invalid message topic");
case 'describeConfigs':
// msg.payload={type:'topic',name:'a-topic'}
const resource = {
resourceType: node.connection.RESOURCE_TYPES[msg.payload.type||'topic'], // 'broker' or 'topic'
resourceName: msg.payload.name,
configNames: [] // specific config names, or empty array to return all,
}
const payload = {
resources: [resource],
includeSynonyms: false // requires kafka 2.0+
};
node.connection.describeConfigs(payload, (err,data)=>msgProcess(node,msg,err,data));
return;
default: throw Error("invalid message topic");
}
} catch(e) {
debug({label:"input catch",error:e,msg:msg,connection:Object.keys(node.connection)});
debug({label:"processInput catch",error:e,msg:msg,connection:Object.keys(node.connection)});
msg.error=e.toString();

@@ -167,19 +175,7 @@ node.send([null,msg]);

try {
switch (req.params.action) {
case 'listGroups':
node.connection.listGroups((err,data)=>adminRequest(node,res,err,data));
break;
case 'describeGroups':
node.connection.describeGroups(msg.payload,(err,data)=>adminRequest(node,res,err,data));
return;
case 'listTopics':
node.connection.listTopics((err,data)=>adminRequest(node,res,err,data));
return;
case 'connect':
debug({label:"httpadmin connect",connection:Object.keys(node.connection)});
node.connection.connect((err,data)=>adminRequest(node,res,err,data));
return;
default:
throw Error("unknown action: "+req.params.action);
}
if(processInputNoArg.includes(req.params.action)) {
node.connection[req.params.action]((err,data)=>adminRequest(node,res,err,data));
return;
}
throw Error("unknown action: "+req.params.action);
} catch(err) {

@@ -186,0 +182,0 @@ debug({label:"httpAdmin",error:err,request:req.params,connection:Object.keys(node.connection)});

{
"name": "node-red-contrib-kafka-manager",
"version": "0.2.7",
"version": "0.2.8",
"description": "Node-RED implements Kafka manager with associand associated .",

@@ -5,0 +5,0 @@ "dependencies": {},

@@ -113,3 +113,4 @@ # [node-red-contrib-kafka-manager][2]

# Version
0.2.8 Added all admin api's per Kafka 2.3 but dependent on [kafka-node][4] update.
Remove refresh metadata, automated if problem. Fix consumer group errors. Add tests for admin calls.
0.2.7 If offsetOutOfRange pause consumer. Added in deleteTopics but dependant on [kafka-node][4] update.

@@ -116,0 +117,0 @@ 0.2.6 More fixes for error processing on invalid topic

@@ -389,3 +389,3 @@ [

"x": 130,
"y": 520,
"y": 500,
"wires": [

@@ -410,3 +410,3 @@ [

"x": 130,
"y": 580,
"y": 540,
"wires": [

@@ -413,0 +413,0 @@ [

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc