node-red-contrib-kafka-manager
Advanced tools
Comparing version 0.2.7 to 0.2.8
@@ -38,60 +38,68 @@ const ts=(new Date().toString()).split(' '); | ||
switch (msg.topic) { | ||
case 'createTopics': | ||
case 'deleteTopics': | ||
data.forEach((c,i,a)=>{ | ||
let t=msg.payload.find((cp)=>cp.topic==c.topic); | ||
debug({label:"msgProcess multi response",topic:c,data:t}); | ||
if(c.hasOwnProperty('error')) { | ||
debug({label:"createTopics multi response",data:{topic:msg.topic,error:c.error,payload:[t]}}); | ||
node.send([null,{topic:msg.topic,error:c.error,payload:[t]}]); | ||
return; | ||
} | ||
debug({label:"msgProcess multi response ok",data:{topic:msg.topic,payload:[c]}}); | ||
node.send({topic:msg.topic,payload:[t]}); | ||
}); | ||
default: | ||
msg.payload=data; | ||
node.send(msg); | ||
case 'createAcls': | ||
case 'createTopics': | ||
case 'deleteAcls': | ||
case 'deleteAcls': | ||
case 'deleteTopics': | ||
case 'electPreferredLeaders': | ||
data.forEach((c,i,a)=>{ | ||
let t=msg.payload.find((cp)=>cp.topic==c.topic); | ||
debug({label:"msgProcess multi response",topic:c,data:t}); | ||
if(c.hasOwnProperty('error')) { | ||
debug({label:"msgProcess multi response",data:{topic:msg.topic,error:c.error,payload:[t]}}); | ||
node.send([null,{topic:msg.topic,error:c.error,payload:[t]}]); | ||
return; | ||
} | ||
debug({label:"msgProcess multi response ok",data:{topic:msg.topic,payload:[c]}}); | ||
node.send({topic:msg.topic,payload:[t]}); | ||
}); | ||
break; | ||
default: | ||
msg.payload=data; | ||
node.send(msg); | ||
} | ||
} | ||
const processInputNoArg=[ | ||
"describeCluster","describeDelegationToken","describeReplicaLogDirs","listConsumerGroups","listGroups","listTopics" | ||
]; | ||
const processInputPayloadArg=[ | ||
"alterConfigs","alterReplicaLogDirs","createAcls","","createDelegationToken", | ||
"createPartitions","createTopics","deleteAcls","deleteConsumerGroups","deleteRecords", | ||
"deleteTopics","describeAcls","describeConsumerGroups", | ||
"describeGroups","describeLogDirs","describeTopics","electPreferredLeaders", | ||
"expireDelegationToken","incrementalAlterConfigs","listConsumerGroupOffsets", | ||
"renewDelegationToken" | ||
]; | ||
function processInput(node,msg){ | ||
debug({label:"processInput",msg}); | ||
try{ | ||
if(processInputNoArg.includes(msg.topic)) { | ||
debug({label:"processInput processInputNoArg",msg}); | ||
node.connection[msg.topic]((err,data)=>msgProcess(node,msg,err,data)); | ||
return; | ||
} | ||
if(processInputPayloadArg.includes(msg.topic)) { | ||
debug({label:"processInput processInputPayloadArg",msg}); | ||
node.connection[msg.topic](msg.payload,(err,data)=>msgProcess(node,msg,err,data)); | ||
return; | ||
} | ||
switch(msg.topic) { | ||
case'listGroups': | ||
node.connection.listGroups((err,data)=>msgProcess(node,msg,err,data)); | ||
return; | ||
case'describeGroups': | ||
node.connection.describeGroups(msg.payload,(err,data)=>msgProcess(node,msg,err,data)); | ||
return; | ||
case'listTopics': | ||
node.connection.listTopics((err,data)=>msgProcess(node,msg,err,data)); | ||
return; | ||
case'createTopics': | ||
// msg.payload = [{topic: 'topic1',partitions: 1,replicationFactor: 2}]; | ||
node.connection.createTopics(msg.payload,(err,data)=>msgProcess(node,msg,err,data)); | ||
return; | ||
case'deleteTopics': | ||
// msg.payload = ['topic1']; | ||
node.connection.deleteTopics(msg.payload,(err,data)=>msgProcess(node,msg,err,data)); | ||
return; | ||
case'describeConfigs': | ||
// msg.payload={type:'topic',name:'a-topic'} | ||
const resource = { | ||
resourceType: node.connection.RESOURCE_TYPES[msg.payload.type||'topic'], // 'broker' or 'topic' | ||
resourceName: msg.payload.name, | ||
configNames: [] // specific config names, or empty array to return all, | ||
} | ||
const payload = { | ||
resources: [resource], | ||
includeSynonyms: false // requires kafka 2.0+ | ||
}; | ||
node.connection.describeConfigs(payload, (err,data)=>msgProcess(node,msg,err,data)); | ||
return; | ||
case'refreshMetadata': | ||
node.connection.refreshMetadata(); | ||
return; | ||
default: throw Error("invalid message topic"); | ||
case 'describeConfigs': | ||
// msg.payload={type:'topic',name:'a-topic'} | ||
const resource = { | ||
resourceType: node.connection.RESOURCE_TYPES[msg.payload.type||'topic'], // 'broker' or 'topic' | ||
resourceName: msg.payload.name, | ||
configNames: [] // specific config names, or empty array to return all, | ||
} | ||
const payload = { | ||
resources: [resource], | ||
includeSynonyms: false // requires kafka 2.0+ | ||
}; | ||
node.connection.describeConfigs(payload, (err,data)=>msgProcess(node,msg,err,data)); | ||
return; | ||
default: throw Error("invalid message topic"); | ||
} | ||
} catch(e) { | ||
debug({label:"input catch",error:e,msg:msg,connection:Object.keys(node.connection)}); | ||
debug({label:"processInput catch",error:e,msg:msg,connection:Object.keys(node.connection)}); | ||
msg.error=e.toString(); | ||
@@ -167,19 +175,7 @@ node.send([null,msg]); | ||
try { | ||
switch (req.params.action) { | ||
case 'listGroups': | ||
node.connection.listGroups((err,data)=>adminRequest(node,res,err,data)); | ||
break; | ||
case 'describeGroups': | ||
node.connection.describeGroups(msg.payload,(err,data)=>adminRequest(node,res,err,data)); | ||
return; | ||
case 'listTopics': | ||
node.connection.listTopics((err,data)=>adminRequest(node,res,err,data)); | ||
return; | ||
case 'connect': | ||
debug({label:"httpadmin connect",connection:Object.keys(node.connection)}); | ||
node.connection.connect((err,data)=>adminRequest(node,res,err,data)); | ||
return; | ||
default: | ||
throw Error("unknown action: "+req.params.action); | ||
} | ||
if(processInputNoArg.includes(req.params.action)) { | ||
node.connection[req.params.action]((err,data)=>adminRequest(node,res,err,data)); | ||
return; | ||
} | ||
throw Error("unknown action: "+req.params.action); | ||
} catch(err) { | ||
@@ -186,0 +182,0 @@ debug({label:"httpAdmin",error:err,request:req.params,connection:Object.keys(node.connection)}); |
{ | ||
"name": "node-red-contrib-kafka-manager", | ||
"version": "0.2.7", | ||
"version": "0.2.8", | ||
"description": "Node-RED implements Kafka manager with associand associated .", | ||
@@ -5,0 +5,0 @@ "dependencies": {}, |
@@ -113,3 +113,4 @@ # [node-red-contrib-kafka-manager][2] | ||
# Version | ||
0.2.8 Added all admin api's per Kafka 2.3 but dependent on [kafka-node][4] update. | ||
Remove refresh metadata, automated if problem. Fix consumer group errors. Add tests for admin calls. | ||
0.2.7 If offsetOutOfRange pause consumer. Added in deleteTopics but dependant on [kafka-node][4] update. | ||
@@ -116,0 +117,0 @@ 0.2.6 More fixes for error processing on invalid topic |
@@ -389,3 +389,3 @@ [ | ||
"x": 130, | ||
"y": 520, | ||
"y": 500, | ||
"wires": [ | ||
@@ -410,3 +410,3 @@ [ | ||
"x": 130, | ||
"y": 580, | ||
"y": 540, | ||
"wires": [ | ||
@@ -413,0 +413,0 @@ [ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
869074
149
1561