kafka-node
Advanced tools
Comparing version 0.2.10 to 0.2.11
@@ -11,3 +11,3 @@ 'use strict'; | ||
var client = new Client(); | ||
var client = new Client('localhost:2181'); | ||
var topics = [ | ||
@@ -14,0 +14,0 @@ {topic: topic, partition: 1}, |
@@ -9,3 +9,3 @@ 'use strict'; | ||
var topic = argv.topic || 'topic1'; | ||
var client = new Client(); | ||
var client = new Client('localhost:2181'); | ||
var topics = [ { topic: topic }]; | ||
@@ -12,0 +12,0 @@ var options = { autoCommit: false, fromBeginning: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024*1024 }; |
var kafka = require('../kafka'), | ||
Producer = kafka.Producer, | ||
Client = kafka.Client, | ||
client = new Client(); | ||
client = new Client('localhost:2181'); | ||
@@ -16,2 +16,6 @@ var argv = require('optimist').argv; | ||
producer.on('error', function (err) { | ||
console.log('error', err) | ||
}) | ||
function send(message) { | ||
@@ -18,0 +22,0 @@ for (var i = 0; i < count; i++) { |
@@ -40,2 +40,3 @@ 'use strict'; | ||
this.ready = false; | ||
this.paused = this.options.paused; | ||
this.id = nextId(); | ||
@@ -159,3 +160,3 @@ this.payloads = this.buildPayloads(topics); | ||
Consumer.prototype.fetch = function () { | ||
if (!this.ready) return; | ||
if (!this.ready || this.paused) return; | ||
this.client.sendFetchRequest(this, this.payloads, this.options.fetchMaxWaitMs, this.options.fetchMinBytes); | ||
@@ -253,2 +254,11 @@ } | ||
Consumer.prototype.pause = function () { | ||
this.paused = true; | ||
} | ||
Consumer.prototype.resume = function () { | ||
this.paused = false; | ||
this.fetch(); | ||
} | ||
module.exports = Consumer; |
@@ -425,21 +425,7 @@ 'use strict'; | ||
HighLevelConsumer.prototype.fetch = function () { | ||
if (!this.ready) { | ||
if (!this.ready || this.rebalancing || this.paused) { | ||
return; | ||
} | ||
if (this.rebalancing) return; | ||
// If paused request 0 data - need to pause in this way otherwise the connection is dropped | ||
if (this.paused) { | ||
// Build empty payloads on demand | ||
var empty_payloads = this.topicPayloads.map(function (j) { | ||
var k = { topic: j.topic, partition: j.partition || 0, offset: j.offset || 0, maxBytes: 0, metadata: 'm'}; | ||
return k; | ||
}); | ||
this.client.sendFetchRequest(this, empty_payloads, this.options.fetchMaxWaitMs, 0); | ||
} | ||
else { | ||
this.client.sendFetchRequest(this, this.topicPayloads, this.options.fetchMaxWaitMs, this.options.fetchMinBytes, this.options.maxTickMessages); | ||
} | ||
this.client.sendFetchRequest(this, this.topicPayloads, this.options.fetchMaxWaitMs, this.options.fetchMinBytes, this.options.maxTickMessages); | ||
} | ||
@@ -548,4 +534,5 @@ | ||
this.paused = false; | ||
this.fetch(); | ||
} | ||
module.exports = HighLevelConsumer; |
{ | ||
"name": "kafka-node", | ||
"description": "node client for Apache kafka, only support kafka 0.8 and above", | ||
"version": "0.2.10", | ||
"version": "0.2.11", | ||
"main": "kafka.js", | ||
@@ -6,0 +6,0 @@ "dependencies": { |
@@ -67,2 +67,4 @@ Kafka-node | ||
}); | ||
producer.on('error', function (err) {}) | ||
``` | ||
@@ -280,2 +282,8 @@ | ||
### pause() | ||
Pause the consumer | ||
### resume() | ||
Resume the consumer | ||
### close(force, cb) | ||
@@ -282,0 +290,0 @@ * `force`: **Boolean**, if set true, it force commit current offset before close, default false |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
536
132042