kafka-node
Advanced tools
Comparing version 0.2.8 to 0.2.9
@@ -166,3 +166,4 @@ 'use strict'; | ||
Consumer.prototype.addTopics = function (topics, cb) { | ||
Consumer.prototype.addTopics = function (topics, cb, fromOffset) { | ||
fromOffset = !!fromOffset; | ||
var self = this; | ||
@@ -175,4 +176,13 @@ if (!this.ready) { | ||
} | ||
// The default is that the topics is a string array of topic names | ||
var topicNames = topics; | ||
// If the topics is actually an object and not string we assume it is an array of payloads | ||
if (typeof topics[0] === 'object') { | ||
topicNames = topics.map(function (p) { return p.topic; }) | ||
} | ||
this.client.addTopics( | ||
topics, | ||
topicNames, | ||
function (err, added) { | ||
@@ -183,2 +193,12 @@ if (err) return cb && cb(err, added); | ||
var reFetch = !self.payloads.length; | ||
if (fromOffset) { | ||
payloads.forEach(function (p) { | ||
self.payloads.push(p); | ||
}); | ||
if (reFetch) self.fetch(); | ||
cb && cb(null, added); | ||
return; | ||
} | ||
// update offset of topics that will be added | ||
@@ -185,0 +205,0 @@ self.fetchOffset(payloads, function (err, offsets) { |
{ | ||
"name": "kafka-node", | ||
"description": "node client for Apache kafka, only support kafka 0.8 and above", | ||
"version": "0.2.8", | ||
"version": "0.2.9", | ||
"main": "kafka.js", | ||
@@ -6,0 +6,0 @@ "dependencies": { |
@@ -223,6 +223,7 @@ Kafka-node | ||
### addTopics(topics, cb) | ||
### addTopics(topics, cb, fromOffset) | ||
Add topics to current consumer, if any topic to be added not exists, return error | ||
* `topics`: **Array**, array of topics to add | ||
* `cb`: **Function**,the callback | ||
* `fromOffset`: **Boolean**, if ture, the consumer will fetch message from the specified offset, otherwise it will fetch message from the last commited offset of the topic. | ||
@@ -234,2 +235,7 @@ Example: | ||
}); | ||
or | ||
consumer.addTopics([{ topic: 't1', offset: 10 }], function (err, added) { | ||
}, true); | ||
``` | ||
@@ -348,7 +354,7 @@ | ||
### addTopics(topics, cb) | ||
### addTopics(topics, cb, fromOffset) | ||
Add topics to current consumer, if any topic to be added not exists, return error | ||
* `topics`: **Array**, array of topics to add | ||
* `cb`: **Function**,the callback | ||
* `fromOffset`: **Boolean**, if ture, the consumer will fetch message from the specified offset, otherwise it will fetch message from the last commited offset of the topic. | ||
@@ -360,2 +366,7 @@ Example: | ||
}); | ||
or | ||
consumer.addTopics([{ topic: 't1', offset: 10 }], function (err, added) { | ||
}, true); | ||
``` | ||
@@ -362,0 +373,0 @@ |
@@ -12,2 +12,3 @@ 'use strict'; | ||
var host = process.env['KAFKA_TEST_HOST'] || ''; | ||
function noop() { console.log(arguments) } | ||
@@ -24,3 +25,3 @@ | ||
before(function (done) { | ||
client = new Client(); | ||
client = new Client(host); | ||
producer = new Producer(client); | ||
@@ -77,3 +78,3 @@ offset = new Offset(client); | ||
var client = new Client(); | ||
var client = new Client(host); | ||
var consumer = new Consumer(client, topics, options); | ||
@@ -103,2 +104,14 @@ consumer.on('offsetOutOfRange', function (topic) { | ||
}); | ||
it('should return error when using payload as well', function (done) { | ||
var options = { autoCommit: false, groupId: '_groupId_1_test' }, | ||
topics = [{topic: '_not_exist_topic_1_test', offset: 42}]; | ||
var consumer = new Consumer(client, [], options); | ||
consumer.on('error', noop); | ||
consumer.addTopics(topics, function (err, data) { | ||
err.should.equal(1); | ||
data.should.eql(topics.map(function(p) {return p.topic;})); | ||
done(); | ||
}, true); | ||
}); | ||
}); | ||
@@ -118,2 +131,14 @@ | ||
}); | ||
it('should add with given offset', function (done) { | ||
var options = { autoCommit: false, groupId: '_groupId_addTopics_test' }, | ||
topics = [{topic: '_exist_topic_2_test', offset: 42}]; | ||
var consumer = new Consumer(client, [], options); | ||
consumer.on('error', noop); | ||
consumer.addTopics(topics, function (err, data) { | ||
data.should.eql(topics.map(function(p) {return p.topic;})); | ||
consumer.payloads.some(function (p) { return p.topic === topics[0].topic && p.offset === topics[0].offset; }).should.be.ok; | ||
done(); | ||
}, true); | ||
}); | ||
}); | ||
@@ -157,3 +182,3 @@ }); | ||
var client = new Client(); | ||
var client = new Client(host); | ||
var consumer = new Consumer(client, topics, options); | ||
@@ -230,3 +255,3 @@ var count = 0; | ||
it('should close the consumer', function (done) { | ||
var client = new Client(), | ||
var client = new Client(host), | ||
topics = [ { topic: '_exist_topic_2_test' } ], | ||
@@ -244,3 +269,3 @@ options = { autoCommit: false, groupId: '_groupId_close_test' }; | ||
it('should commit the offset if force', function (done) { | ||
var client = new Client(), | ||
var client = new Client(host), | ||
topics = [ { topic: '_exist_topic_2_test' } ], | ||
@@ -247,0 +272,0 @@ force = true, |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
131717
3064
528
13