kafka-node
Advanced tools
Comparing version 0.2.5 to 0.2.6
@@ -64,3 +64,6 @@ 'use strict'; | ||
self.refreshBrokers(brokerMetadata); | ||
self.emit('brokersChanged'); | ||
// Emit after a 3 seconds | ||
setTimeout(function () { | ||
self.emit('brokersChanged'); | ||
}, 3000); | ||
}); | ||
@@ -228,2 +231,3 @@ } | ||
Client.prototype.refreshBrokers = function (brokerMetadata) { | ||
var self = this; | ||
this.brokerMetadata = brokerMetadata; | ||
@@ -236,2 +240,3 @@ deleteDeadBrokers(this.brokers); | ||
}).forEach(function (deadKey) { | ||
self.closeBrokers([brokers[deadKey]]); | ||
delete brokers[deadKey]; | ||
@@ -336,3 +341,3 @@ }.bind(this)); | ||
Client.prototype.updateMetadatas = function (metadatas) { | ||
_.extend(this.brokerMetadata, metadatas[0]); | ||
// _.extend(this.brokerMetadata, metadatas[0]); | ||
_.extend(this.topicMetadata, metadatas[1].metadata); | ||
@@ -339,0 +344,0 @@ for(var topic in this.topicMetadata) { |
@@ -196,16 +196,2 @@ 'use strict'; | ||
Zookeeper.prototype.getBrokerDetail = function (brokerId, cb) { | ||
var path = '/brokers/ids/' + brokerId; | ||
this.client.getData( | ||
path, | ||
function (error, data) { | ||
if (error) { | ||
console.log('Error occurred when getting data: %s.', error); | ||
} | ||
cb && cb(data); | ||
} | ||
); | ||
}; | ||
Zookeeper.prototype.listBrokers = function (cb) { | ||
@@ -225,23 +211,21 @@ var that = this; | ||
var brokers = {}; | ||
if (!that.inited) { | ||
var brokerId = children.shift(); | ||
that.getBrokerDetail(brokerId, function (data) { | ||
brokers[brokerId] = JSON.parse(data.toString()); | ||
function getBrokerDetail (id, cb) { | ||
var path = '/brokers/ids/' + id; | ||
that.client.getData(path,function (err, data) { | ||
if (err) return cb(err); | ||
brokers[id] = JSON.parse(data.toString()); | ||
cb(); | ||
}); | ||
} | ||
async.each(children, getBrokerDetail, function (err) { | ||
if (err) return cb(err); | ||
if (!that.inited) { | ||
that.emit('init', brokers); | ||
that.inited = true; | ||
cb && cb(brokers); //For test | ||
}) | ||
} else { | ||
var count = 0; | ||
children.forEach(function (brokerId) { | ||
that.getBrokerDetail(brokerId, function (data) { | ||
brokers[brokerId] = JSON.parse(data.toString()); | ||
if (++count == children.length) { | ||
that.emit('brokersChanged', brokers) | ||
cb && cb(brokers); //For test | ||
} | ||
}) | ||
}) | ||
} | ||
} else { | ||
that.emit('brokersChanged', brokers) | ||
} | ||
cb && cb(brokers); //For test | ||
}); | ||
} else { | ||
@@ -257,3 +241,2 @@ if (that.inited) | ||
Zookeeper.prototype.listConsumers = function (groupId) { | ||
@@ -260,0 +243,0 @@ var that = this; |
{ | ||
"name": "kafka-node", | ||
"description": "node client for Apache kafka, only support kafka 0.8 and above", | ||
"version": "0.2.5", | ||
"version": "0.2.6", | ||
"main": "kafka.js", | ||
@@ -6,0 +6,0 @@ "dependencies": { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
128951
3017