node-red-contrib-kafka-manager
Advanced tools
Comparing version 0.4.2 to 0.4.3
@@ -53,6 +53,13 @@ const logger = new (require("node-red-contrib-logger"))("Kafka Consumer"); | ||
} | ||
const sendMessage = function(node, message) { | ||
if(node.convertToJson){ | ||
message.value = JSON.parse(message.value); | ||
} | ||
node.brokerNode.sendMsg(node, message) | ||
} | ||
if (Array.isArray(message)) { | ||
message.forEach((r) => node.brokerNode.sendMsg(node, r)) | ||
message.forEach((r) => sendMessage(node, r)) | ||
} else { | ||
node.brokerNode.sendMsg(node, message) | ||
sendMessage(node, message) | ||
} | ||
@@ -59,0 +66,0 @@ }) |
@@ -226,2 +226,5 @@ const logger = new (require("node-red-contrib-logger"))("Kafka Producer"); | ||
if (node.connected) { | ||
if(node.convertFromJson) { | ||
msg.payload = JSON.stringify(msg.payload) | ||
} | ||
producerSend(node, msg) | ||
@@ -228,0 +231,0 @@ return |
{ | ||
"name": "node-red-contrib-kafka-manager", | ||
"version": "0.4.2", | ||
"version": "0.4.3", | ||
"description": "Node-RED implements Kafka manager with associand associated .", | ||
@@ -5,0 +5,0 @@ "dependencies": { |
@@ -136,2 +136,4 @@ # [node-red-contrib-kafka-manager][2] | ||
0.4.3 add convert message payload to/from json and add some basic tests | ||
0.4.1 minor fixes | ||
@@ -138,0 +140,0 @@ |
@@ -1,8 +0,193 @@ | ||
const assert=require('assert'); | ||
//const assert=require('assert'); | ||
const should = require("should"); | ||
const helper = require("node-red-node-test-helper"); | ||
const kafkaAdmin = require("../kafkaManager/kafkaAdmin.js"); | ||
const kafkaBroker = require("../kafkaManager/kafkaBroker.js"); | ||
const kafkaCommit = require("../kafkaManager/kafkaCommit.js"); | ||
const kafkaConsumer = require("../kafkaManager/kafkaConsumer.js"); | ||
const kafkaConsumerGroup = require("../kafkaManager/kafkaConsumerGroup.js"); | ||
const kafkaOffset = require("../kafkaManager/kafkaOffset.js"); | ||
const kafkaProducer = require("../kafkaManager/kafkaProducer.js"); | ||
const kafkaRollback = require("../kafkaManager/kafkaRollback.js"); | ||
const nodes=[kafkaBroker,kafkaAdmin,kafkaCommit,kafkaConsumer,kafkaConsumerGroup,kafkaOffset,kafkaProducer,kafkaRollback] | ||
describe('dummy test', function() { | ||
it('supported', function(done) { | ||
assert.equal(true, true); | ||
helper.init(require.resolve('node-red')); | ||
function getAndTestNodeProperties(o) { | ||
const n = helper.getNode(o.id); | ||
if(n==null) throw Error("can find node "+o.id); | ||
for(let p in o) n.should.have.property(p, o[p]); | ||
return n; | ||
} | ||
const broker={ | ||
"id" : "brokerID", | ||
"type" : "Kafka Broker", | ||
"name" : "Kafta", | ||
"hosts" : [ { | ||
"host" : "127.0.0.1", | ||
"port" : 9092 | ||
} ], | ||
"hostsEnvVar" : "", | ||
"connectTimeout" : "10000", | ||
"requestTimeout" : "30000", | ||
"autoConnect" : "true", | ||
"idleConnection" : "5", | ||
"reconnectOnIdle" : "true", | ||
"maxAsyncRequests" : "10", | ||
"checkInterval" : "10", | ||
"selfSign" : true, | ||
"usetls" : false, | ||
"useCredentials" : false | ||
} ; | ||
const admin={ | ||
"id" : "kafkaAdminId", | ||
"type" : "Kafka Admin", | ||
"name" : "Kafka Admin name", | ||
"broker" : broker.id | ||
}; | ||
const consumer_test={ | ||
"id" : "consumerID", | ||
"type" : "Kafka Consumer", | ||
"name" : "Kafka Consumer Name", | ||
"broker" : broker.id, | ||
"topic" : null, | ||
"topics" : [ { | ||
"topic" : "test", | ||
"offset" : 0, | ||
"partition" : 0 | ||
}, { | ||
"topic" : "atest", | ||
"offset" : 0, | ||
"partition" : 0 | ||
} ], | ||
"groupId" : "kafka-node-group", | ||
"autoCommit" : "true", | ||
"autoCommitIntervalMs" : 5000, | ||
"fetchMaxWaitMs" : 100, | ||
"fetchMinBytes" : 1, | ||
"fetchMaxBytes" : 1048576, | ||
"fromOffset" : 0, | ||
"encoding" : "utf8", | ||
"keyEncoding" : "utf8", | ||
"connectionType" : "Consumer" | ||
}; | ||
const producer={ | ||
"id" : "producerId", | ||
"type" : "Kafka Producer", | ||
"name" : "Kafka Producer Name", | ||
"broker" : broker.id, | ||
"topic" : "test", | ||
"requireAcks" : 1, | ||
"ackTimeoutMs" : 100, | ||
"partitionerType" : 0, | ||
"key" : "", | ||
"partition" : 0, | ||
"attributes" : 0, | ||
"connectionType" : "Producer" | ||
}; | ||
const producerHL={ | ||
"id" : "producerHLId", | ||
"type" : "Kafka Producer", | ||
"name" : "Kafka Producer HL Name", | ||
"broker" : broker.id, | ||
"topic" : "atest", | ||
"requireAcks" : 1, | ||
"ackTimeoutMs" : 100, | ||
"partitionerType" : 0, | ||
"key" : "", | ||
"partition" : 0, | ||
"attributes" : 0, | ||
"connectionType" : "HighLevelProducer" | ||
}; | ||
const consumer_atest={ | ||
"id" : "consumerId", | ||
"type" : "Kafka Consumer", | ||
"name" : "Consumer topic atest", | ||
"broker" : broker.id, | ||
"topics" : [ { | ||
"topic" : "atest", | ||
"offset" : 0, | ||
"partition" : 0 | ||
} ], | ||
"groupId" : "groupTopicAtest", | ||
"autoCommit" : "true", | ||
"autoCommitIntervalMs" : 5000, | ||
"fetchMaxWaitMs" : 100, | ||
"fetchMinBytes" : 1, | ||
"fetchMaxBytes" : 1048576, | ||
"fromOffset" : 0, | ||
"encoding" : "utf8", | ||
"keyEncoding" : "utf8", | ||
"connectionType" : "Consumer" | ||
}; | ||
const consumerGroup={ | ||
"id" : "consumerGroupID", | ||
"type" : "Kafka Consumer Group", | ||
"name" : "consumerGroup", | ||
"broker" : broker.id, | ||
"groupId" : "aGroup", | ||
"sessionTimeout" : 15000, | ||
"protocol" : [ "roundrobin" ], | ||
"encoding" : "utf8", | ||
"fromOffset" : "latest", | ||
"commitOffsetsOnFirstJoin" : "true", | ||
"outOfRangeOffset" : "earliest", | ||
"topics" : [ "test", "topic2" ] | ||
}; | ||
const createTopics={ | ||
"topic" : "createTopics", | ||
"payload" : "[{\"topic\":\"aTestRemoveTopic\",\"partitions\":1,\"replicationFactor\":1},{\"topic\":\"aTestRemoveTopicfail\"},{\"topic\":\"test\",\"partitions\":1,\"replicationFactor\":1},{\"topic\":\"atest\",\"partitions\":1,\"replicationFactor\":1},{\"topic\":\"testCommit\",\"partitions\":1,\"replicationFactor\":1},{\"topic\":\"testRollback\",\"partitions\":1,\"replicationFactor\":1}]", | ||
"wires" : [ [ "31c34a4.2603ab6" ] ] | ||
}; | ||
function testFlow(done,data,result) { | ||
const flow = [ | ||
broker, | ||
admin, | ||
producer, | ||
producerHL, | ||
Object.assign(consumer_test,{wires : [ [ "outHelper" ],["errorHelper"] ]}), | ||
Object.assign(consumer_atest,{wires : [ [ "outHelper" ],["errorHelper"] ]}), | ||
consumerGroup, | ||
{id :"outHelper", type : "helper"}, | ||
{id :"errorHelper", type : "helper"} | ||
]; | ||
helper.load(nodes, flow,function() { | ||
const brokerNode=getAndTestNodeProperties(broker); | ||
const adminNode=getAndTestNodeProperties(admin); | ||
const producerNode=getAndTestNodeProperties(producer); | ||
const producerHLNode=getAndTestNodeProperties(producerHL); | ||
const consumer_testNode=getAndTestNodeProperties(consumer_test); | ||
const consumer_atestNode=getAndTestNodeProperties(consumer_atest); | ||
const consumerGroupNode=getAndTestNodeProperties(consumerGroup); | ||
const outHelper = helper.getNode("outHelper"); | ||
const errorHelper = helper.getNode("errorHelper"); | ||
outHelper.on("input", function(msg) { | ||
done(); | ||
}); | ||
errorHelper.on("input", function(msg) { | ||
done("error check log output"); | ||
}); | ||
adminNode.receive(createTopics); | ||
done(); | ||
}); | ||
} | ||
describe('basic test', function() { | ||
beforeEach(function(done) { | ||
helper.startServer(done); | ||
}); | ||
afterEach(function(done) { | ||
helper.unload(); | ||
helper.stopServer(done); | ||
}); | ||
it('load objects', function(done) { | ||
testFlow(done); | ||
}); | ||
}); | ||
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
917613
1952
185