Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

kafka-node

Package Overview
Dependencies
Maintainers
1
Versions
113
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-node - npm Package Compare versions

Comparing version 0.1.7 to 0.1.8

test/manual.gracefulexit.js

18

lib/client.js

@@ -46,2 +46,16 @@ 'use strict';

Client.prototype.close = function (cb) {
this.closeBrokers(this.brokers);
this.closeBrokers(this.longpollingBrokers);
this.zk.client.close();
cb && cb();
}
Client.prototype.closeBrokers = function (brokers) {
_.each(brokers, function (broker) {
broker.closing = true;
broker.end();
});
}
Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes) {

@@ -218,3 +232,3 @@ var topics = {},

var broker = this.brokerForLeader(leader, longpolling);
if (broker.error) return cb('Leader not available', payloads[leader]);
if (broker.error || broker.closing) return cb('Leader not available', payloads[leader]);
if (longpolling) {

@@ -326,3 +340,3 @@ if (broker.waitting) continue;

function retry(s) {
if(s.retrying) return;
if(s.retrying || s.closing) return;
s.retrying = true;

@@ -329,0 +343,0 @@ s.error = true;

32

lib/consumer.js

@@ -115,3 +115,3 @@ 'use strict';

self.updateOffsets(topics);
self.updateOffsets(topics, true);
self.fetch();

@@ -124,4 +124,6 @@ });

* Update offset info in current payloads
* @param {Object} Topic-partition-offset
* @param {Boolean} Don't commit when initing consumer
*/
Consumer.prototype.updateOffsets = function (topics) {
Consumer.prototype.updateOffsets = function (topics, initing) {
this.payloads.forEach(function (p) {

@@ -132,3 +134,3 @@ if (!_.isEmpty(topics[p.topic]) && topics[p.topic][p.partition] !== undefined)

if (this.options.autoCommit) this.autoCommit();
if (this.options.autoCommit && !initing) this.autoCommit();
}

@@ -147,7 +149,10 @@

out.push(_.defaults({ offset: p.offset-1 }, p));
else out.push(p);
return out;
}, []);
this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb);
if (commits.length) {
this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb);
} else {
cb && cb();
}
}

@@ -201,5 +206,16 @@ Consumer.prototype.commit = Consumer.prototype.autoCommit = autoCommit;

Consumer.prototype.close = function (force) {
Consumer.prototype.close = function (force, cb) {
this.ready = false;
if (force) this.commit();
if (typeof force === 'function') {
cb = force;
force = false;
}
if (force) {
this.commit(force, function (err) {
this.client.close(cb);
}.bind(this));
} else {
this.client.close(cb);
}
}

@@ -206,0 +222,0 @@

{
"name": "kafka-node",
"description": "node client for Apache kafka, only support kafka 0.8 and above",
"version": "0.1.7",
"version": "0.1.8",
"main": "kafka.js",

@@ -14,3 +14,3 @@ "dependencies": {

"devDependencies": {
"mocha": "~1.12.0",
"mocha": "^1.18.2",
"should": "~1.2.2",

@@ -17,0 +17,0 @@ "line-by-line": "~0.1.1",

Kafka-node
==========
Kafka-node is a nodejs client with zookeeper integration for apache Kafka, only support the latest version of kafka 0.8 which is still under development, so this module
is `not production ready` so far.
Zookeeper does the following jobs:
Kafka-node is a nodejs client with Zookeeper integration for apache Kafka. It only supports the latest version of Kafka 0.8 which is still under development, so this module
is _not production ready_ so far.
* Load broker metadata from zookeeper before we can communicate with kafka server
* Watch broker state, if broker changed, client will refresh broker and topic metadata stored in client
The Zookeeper integration does the following jobs:
# Install kafka
* Loads broker metadata from Zookeeper before we can communicate with the Kafka server
* Watches broker state, if broker changes, the client will refresh broker and topic metadata stored in the client
# Install Kafka
Follow the [instructions](https://cwiki.apache.org/KAFKA/kafka-08-quick-start.html) on the Kafka wiki to build Kafka 0.8 and get a test broker up and running.

@@ -21,5 +22,10 @@

### close(cb)
Closes the connection to Zookeeper and the brokers so that the node process can exit gracefully.
* `cb`: **Function**, the callback
## Producer
### Producer(client)
* `client`: client which keep connect with kafka server.
* `client`: client which keeps a connection with the Kafka server.

@@ -61,7 +67,7 @@ ``` js

});
})
});
```
### createTopics(topics, async, cb)
This method is used to create topics in kafka server, only work when kafka server set `auto.create.topics.enable` true, our client simply send a metadata request to let server auto crate topics. when `async` set false, this method does not return util all topics are created, otherwise return immediately.
This method is used to create topics on the Kafka server. It only work when `auto.create.topics.enable`, on the Kafka server, is set to true. Our client simply sends a metadata request to the server which will auto create topics. When `async` is set to false, this method does not return until all topics are created, otherwise it returns immediately.

@@ -90,3 +96,3 @@ * `topics`: **Array**,array of topics

### Consumer(client, payloads, options)
* `client`: client which keep connect with kafka server.
* `client`: client which keeps a connection with the Kafka server.
* `payloads`: **Array**,array of `FetchRequest`, `FetchRequest` is a JSON object like:

@@ -138,3 +144,3 @@

### on('message', onMessage);
By default, we will consume message from the last committed offset of the current group
By default, we will consume messages from the last committed offset of the current group

@@ -207,3 +213,3 @@ * `onMessage`: **Function**, callback when new message comes

### close(force)
### close(force, cb)
* `force`: **Boolean**, if set true, it force commit current offset before close, default false

@@ -214,3 +220,4 @@

```js
consumer.close(true);
consumer.close(true, cb);
consuemr.close(cb); //force is force
```

@@ -220,3 +227,3 @@

### Offset(client)
* `client`: client which keep connect with kafka server.
* `client`: client which keeps a connection with the Kafka server.

@@ -278,3 +285,3 @@ ### fetch(payloads, cb)

### fetchcommits(groupid, payloads, cb)
### fetchCommits(groupid, payloads, cb)
Fetch the last committed offset in a topic of a specific consumer group

@@ -281,0 +288,0 @@

@@ -150,2 +150,3 @@ 'use strict';

var client = new Client();
var consumer = new Consumer(client, topics, options);

@@ -219,2 +220,31 @@ var count = 0;

});
describe('#close', function () {
it('should close the consumer', function (done) {
var client = new Client(),
topics = [ { topic: '_exist_topic_2_test' } ],
options = { autoCommit: false, groupId: '_groupId_close_test' };
var consumer = new Consumer(client, topics, options);
consumer.once('message', function (message) {
consumer.close(function (err) {
done(err);
});
});
});
it('should commit the offset if force', function (done) {
var client = new Client(),
topics = [ { topic: '_exist_topic_2_test' } ],
force = true,
options = { autoCommit: false, groupId: '_groupId_close_test' };
var consumer = new Consumer(client, topics, options);
consumer.once('message', function (message) {
consumer.close(force, function (err) {
done(err);
});
});
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc