Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

kafka-node

Package Overview
Dependencies
Maintainers
1
Versions
113
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-node - npm Package Compare versions

Comparing version 0.2.8 to 0.2.9

24

lib/consumer.js

@@ -166,3 +166,4 @@ 'use strict';

Consumer.prototype.addTopics = function (topics, cb) {
Consumer.prototype.addTopics = function (topics, cb, fromOffset) {
fromOffset = !!fromOffset;
var self = this;

@@ -175,4 +176,13 @@ if (!this.ready) {

}
// The default is that the topics is a string array of topic names
var topicNames = topics;
// If the topics is actually an object and not string we assume it is an array of payloads
if (typeof topics[0] === 'object') {
topicNames = topics.map(function (p) { return p.topic; })
}
this.client.addTopics(
topics,
topicNames,
function (err, added) {

@@ -183,2 +193,12 @@ if (err) return cb && cb(err, added);

var reFetch = !self.payloads.length;
if (fromOffset) {
payloads.forEach(function (p) {
self.payloads.push(p);
});
if (reFetch) self.fetch();
cb && cb(null, added);
return;
}
// update offset of topics that will be added

@@ -185,0 +205,0 @@ self.fetchOffset(payloads, function (err, offsets) {

2

package.json
{
"name": "kafka-node",
"description": "node client for Apache kafka, only support kafka 0.8 and above",
"version": "0.2.8",
"version": "0.2.9",
"main": "kafka.js",

@@ -6,0 +6,0 @@ "dependencies": {

@@ -223,6 +223,7 @@ Kafka-node

### addTopics(topics, cb)
### addTopics(topics, cb, fromOffset)
Add topics to current consumer, if any topic to be added not exists, return error
* `topics`: **Array**, array of topics to add
* `cb`: **Function**,the callback
* `fromOffset`: **Boolean**, if ture, the consumer will fetch message from the specified offset, otherwise it will fetch message from the last commited offset of the topic.

@@ -234,2 +235,7 @@ Example:

});
or
consumer.addTopics([{ topic: 't1', offset: 10 }], function (err, added) {
}, true);
```

@@ -348,7 +354,7 @@

### addTopics(topics, cb)
### addTopics(topics, cb, fromOffset)
Add topics to current consumer, if any topic to be added not exists, return error
* `topics`: **Array**, array of topics to add
* `cb`: **Function**,the callback
* `fromOffset`: **Boolean**, if ture, the consumer will fetch message from the specified offset, otherwise it will fetch message from the last commited offset of the topic.

@@ -360,2 +366,7 @@ Example:

});
or
consumer.addTopics([{ topic: 't1', offset: 10 }], function (err, added) {
}, true);
```

@@ -362,0 +373,0 @@

@@ -12,2 +12,3 @@ 'use strict';

var host = process.env['KAFKA_TEST_HOST'] || '';
function noop() { console.log(arguments) }

@@ -24,3 +25,3 @@

before(function (done) {
client = new Client();
client = new Client(host);
producer = new Producer(client);

@@ -77,3 +78,3 @@ offset = new Offset(client);

var client = new Client();
var client = new Client(host);
var consumer = new Consumer(client, topics, options);

@@ -103,2 +104,14 @@ consumer.on('offsetOutOfRange', function (topic) {

});
it('should return error when using payload as well', function (done) {
var options = { autoCommit: false, groupId: '_groupId_1_test' },
topics = [{topic: '_not_exist_topic_1_test', offset: 42}];
var consumer = new Consumer(client, [], options);
consumer.on('error', noop);
consumer.addTopics(topics, function (err, data) {
err.should.equal(1);
data.should.eql(topics.map(function(p) {return p.topic;}));
done();
}, true);
});
});

@@ -118,2 +131,14 @@

});
it('should add with given offset', function (done) {
var options = { autoCommit: false, groupId: '_groupId_addTopics_test' },
topics = [{topic: '_exist_topic_2_test', offset: 42}];
var consumer = new Consumer(client, [], options);
consumer.on('error', noop);
consumer.addTopics(topics, function (err, data) {
data.should.eql(topics.map(function(p) {return p.topic;}));
consumer.payloads.some(function (p) { return p.topic === topics[0].topic && p.offset === topics[0].offset; }).should.be.ok;
done();
}, true);
});
});

@@ -157,3 +182,3 @@ });

var client = new Client();
var client = new Client(host);
var consumer = new Consumer(client, topics, options);

@@ -230,3 +255,3 @@ var count = 0;

it('should close the consumer', function (done) {
var client = new Client(),
var client = new Client(host),
topics = [ { topic: '_exist_topic_2_test' } ],

@@ -244,3 +269,3 @@ options = { autoCommit: false, groupId: '_groupId_close_test' };

it('should commit the offset if force', function (done) {
var client = new Client(),
var client = new Client(host),
topics = [ { topic: '_exist_topic_2_test' } ],

@@ -247,0 +272,0 @@ force = true,

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc