kafka-node
Advanced tools
Comparing version 0.2.12 to 0.2.13
@@ -11,3 +11,3 @@ 'use strict'; | ||
var topics = [ { topic: topic }]; | ||
var options = { autoCommit: false, fromBeginning: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024*1024 }; | ||
var options = { autoCommit: true, fromBeginning: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024*1024 }; | ||
var consumer = new HighLevelConsumer(client, topics, options); | ||
@@ -28,2 +28,2 @@ var offset = new Offset(client); | ||
}); | ||
}); | ||
}); |
@@ -253,4 +253,5 @@ 'use strict'; | ||
var error = err || resp[1].error; | ||
retry++; | ||
if (error) { | ||
retry++; | ||
if (retry === 10) cb(error); | ||
@@ -260,6 +261,12 @@ else setTimeout(function () { | ||
}, 1000); | ||
return; | ||
} else { | ||
if (retry > 2) { | ||
self.updateMetadatas(resp); | ||
cb(); | ||
} else { | ||
setTimeout(function () { | ||
self.refreshMetadata(topicNames, retry, cb) | ||
}, 1000); | ||
} | ||
} | ||
self.updateMetadatas(resp); | ||
cb(); | ||
}); | ||
@@ -266,0 +273,0 @@ } |
@@ -82,3 +82,4 @@ 'use strict'; | ||
}); | ||
this.refreshMetadata(topicNames, 0, function () { | ||
this.refreshMetadata(topicNames, 0, function (err) { | ||
if (err) return self.emit('error', err); | ||
self.fetch(); | ||
@@ -85,0 +86,0 @@ }); |
@@ -108,5 +108,2 @@ 'use strict'; | ||
this.on('ready', function () { | ||
self.ready = true; | ||
register(); | ||
self.fetchOffset(self.topicPayloads, function (err, topics) { | ||
@@ -117,2 +114,3 @@ if (err) { | ||
self.ready = true; | ||
self.updateOffsets(topics, true); | ||
@@ -176,5 +174,14 @@ self.fetch(); | ||
else { | ||
self.rebalancing = false; | ||
self.ready = true; | ||
return self.emit('ready'); | ||
var topicNames = self.topicPayloads.map(function (p) { | ||
return p.topic; | ||
}); | ||
self.client.refreshMetadata(topicNames, 0, function (err) { | ||
register(); | ||
self.rebalancing = false; | ||
if (err) { | ||
self.emit('error', err); | ||
} else { | ||
self.emit('ready'); | ||
} | ||
}); | ||
} | ||
@@ -191,16 +198,7 @@ }); | ||
var reCallback = function () { | ||
if (self.ready) { | ||
rebalance(); | ||
} | ||
else console.log("Consumer or Broker Changed not ready"); | ||
}; | ||
function register() { | ||
console.log("Registered listeners"); | ||
// Register for re-balances (broker or consumer changes) | ||
self.client.zk.on('consumersChanged', reCallback); | ||
self.client.zk.on('brokersChanged', reCallback); | ||
self.client.zk.on('consumersChanged', rebalance); | ||
self.client.zk.on('brokersChanged', rebalance); | ||
} | ||
@@ -211,4 +209,4 @@ | ||
// Register for re-balances (broker or consumer changes) | ||
self.client.zk.removeListener('consumersChanged', reCallback); | ||
self.client.zk.removeListener('brokersChanged', reCallback); | ||
self.client.zk.removeListener('consumersChanged', rebalance); | ||
self.client.zk.removeListener('brokersChanged', rebalance); | ||
} | ||
@@ -228,11 +226,2 @@ | ||
this.client.on('brokersChanged', function () { | ||
var topicNames = self.topicPayloads.map(function (p) { | ||
return p.topic; | ||
}); | ||
this.refreshMetadata(topicNames, 0, function () { | ||
self.fetch(); | ||
}); | ||
}); | ||
this.on('offsetOutOfRange', function (topic) { | ||
@@ -239,0 +228,0 @@ topic.maxNum = self.options.maxNumSegments; |
{ | ||
"name": "kafka-node", | ||
"description": "node client for Apache kafka, only support kafka 0.8 and above", | ||
"version": "0.2.12", | ||
"version": "0.2.13", | ||
"main": "kafka.js", | ||
@@ -6,0 +6,0 @@ "dependencies": { |
Kafka-node | ||
========== | ||
Kafka-node is a nodejs client with Zookeeper integration for apache Kafka. It only supports the latest version of Kafka 0.8 which is still under development, so this module | ||
Kafka-node is a Node.js client with Zookeeper integration for Apache Kafka. It only supports the latest version of Kafka 0.8 which is still under development, so this module | ||
is _not production ready_ so far. | ||
@@ -294,3 +294,3 @@ | ||
consumer.close(true, cb); | ||
consuemr.close(cb); //force is force | ||
consumer.close(cb); //force is force | ||
``` | ||
@@ -424,3 +424,3 @@ | ||
consumer.close(true, cb); | ||
consuemr.close(cb); //force is force | ||
consumer.close(cb); //force is force | ||
``` | ||
@@ -427,0 +427,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
136860
3181