kafka-node
Advanced tools
Comparing version 0.1.7 to 0.1.8
@@ -46,2 +46,16 @@ 'use strict'; | ||
Client.prototype.close = function (cb) { | ||
this.closeBrokers(this.brokers); | ||
this.closeBrokers(this.longpollingBrokers); | ||
this.zk.client.close(); | ||
cb && cb(); | ||
} | ||
Client.prototype.closeBrokers = function (brokers) { | ||
_.each(brokers, function (broker) { | ||
broker.closing = true; | ||
broker.end(); | ||
}); | ||
} | ||
Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes) { | ||
@@ -218,3 +232,3 @@ var topics = {}, | ||
var broker = this.brokerForLeader(leader, longpolling); | ||
if (broker.error) return cb('Leader not available', payloads[leader]); | ||
if (broker.error || broker.closing) return cb('Leader not available', payloads[leader]); | ||
if (longpolling) { | ||
@@ -326,3 +340,3 @@ if (broker.waitting) continue; | ||
function retry(s) { | ||
if(s.retrying) return; | ||
if(s.retrying || s.closing) return; | ||
s.retrying = true; | ||
@@ -329,0 +343,0 @@ s.error = true; |
@@ -115,3 +115,3 @@ 'use strict'; | ||
self.updateOffsets(topics); | ||
self.updateOffsets(topics, true); | ||
self.fetch(); | ||
@@ -124,4 +124,6 @@ }); | ||
* Update offset info in current payloads | ||
* @param {Object} Topic-partition-offset | ||
* @param {Boolean} Don't commit when initing consumer | ||
*/ | ||
Consumer.prototype.updateOffsets = function (topics) { | ||
Consumer.prototype.updateOffsets = function (topics, initing) { | ||
this.payloads.forEach(function (p) { | ||
@@ -132,3 +134,3 @@ if (!_.isEmpty(topics[p.topic]) && topics[p.topic][p.partition] !== undefined) | ||
if (this.options.autoCommit) this.autoCommit(); | ||
if (this.options.autoCommit && !initing) this.autoCommit(); | ||
} | ||
@@ -147,7 +149,10 @@ | ||
out.push(_.defaults({ offset: p.offset-1 }, p)); | ||
else out.push(p); | ||
return out; | ||
}, []); | ||
this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb); | ||
if (commits.length) { | ||
this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb); | ||
} else { | ||
cb && cb(); | ||
} | ||
} | ||
@@ -201,5 +206,16 @@ Consumer.prototype.commit = Consumer.prototype.autoCommit = autoCommit; | ||
Consumer.prototype.close = function (force) { | ||
Consumer.prototype.close = function (force, cb) { | ||
this.ready = false; | ||
if (force) this.commit(); | ||
if (typeof force === 'function') { | ||
cb = force; | ||
force = false; | ||
} | ||
if (force) { | ||
this.commit(force, function (err) { | ||
this.client.close(cb); | ||
}.bind(this)); | ||
} else { | ||
this.client.close(cb); | ||
} | ||
} | ||
@@ -206,0 +222,0 @@ |
{ | ||
"name": "kafka-node", | ||
"description": "node client for Apache kafka, only support kafka 0.8 and above", | ||
"version": "0.1.7", | ||
"version": "0.1.8", | ||
"main": "kafka.js", | ||
@@ -14,3 +14,3 @@ "dependencies": { | ||
"devDependencies": { | ||
"mocha": "~1.12.0", | ||
"mocha": "^1.18.2", | ||
"should": "~1.2.2", | ||
@@ -17,0 +17,0 @@ "line-by-line": "~0.1.1", |
Kafka-node | ||
========== | ||
Kafka-node is a nodejs client with zookeeper integration for apache Kafka, only support the latest version of kafka 0.8 which is still under development, so this module | ||
is `not production ready` so far. | ||
Zookeeper does the following jobs: | ||
Kafka-node is a nodejs client with Zookeeper integration for apache Kafka. It only supports the latest version of Kafka 0.8 which is still under development, so this module | ||
is _not production ready_ so far. | ||
* Load broker metadata from zookeeper before we can communicate with kafka server | ||
* Watch broker state, if broker changed, client will refresh broker and topic metadata stored in client | ||
The Zookeeper integration does the following jobs: | ||
# Install kafka | ||
* Loads broker metadata from Zookeeper before we can communicate with the Kafka server | ||
* Watches broker state, if broker changes, the client will refresh broker and topic metadata stored in the client | ||
# Install Kafka | ||
Follow the [instructions](https://cwiki.apache.org/KAFKA/kafka-08-quick-start.html) on the Kafka wiki to build Kafka 0.8 and get a test broker up and running. | ||
@@ -21,5 +22,10 @@ | ||
### close(cb) | ||
Closes the connection to Zookeeper and the brokers so that the node process can exit gracefully. | ||
* `cb`: **Function**, the callback | ||
## Producer | ||
### Producer(client) | ||
* `client`: client which keep connect with kafka server. | ||
* `client`: client which keeps a connection with the Kafka server. | ||
@@ -61,7 +67,7 @@ ``` js | ||
}); | ||
}) | ||
}); | ||
``` | ||
### createTopics(topics, async, cb) | ||
This method is used to create topics in kafka server, only work when kafka server set `auto.create.topics.enable` true, our client simply send a metadata request to let server auto crate topics. when `async` set false, this method does not return util all topics are created, otherwise return immediately. | ||
This method is used to create topics on the Kafka server. It only work when `auto.create.topics.enable`, on the Kafka server, is set to true. Our client simply sends a metadata request to the server which will auto create topics. When `async` is set to false, this method does not return until all topics are created, otherwise it returns immediately. | ||
@@ -90,3 +96,3 @@ * `topics`: **Array**,array of topics | ||
### Consumer(client, payloads, options) | ||
* `client`: client which keep connect with kafka server. | ||
* `client`: client which keeps a connection with the Kafka server. | ||
* `payloads`: **Array**,array of `FetchRequest`, `FetchRequest` is a JSON object like: | ||
@@ -138,3 +144,3 @@ | ||
### on('message', onMessage); | ||
By default, we will consume message from the last committed offset of the current group | ||
By default, we will consume messages from the last committed offset of the current group | ||
@@ -207,3 +213,3 @@ * `onMessage`: **Function**, callback when new message comes | ||
### close(force) | ||
### close(force, cb) | ||
* `force`: **Boolean**, if set true, it force commit current offset before close, default false | ||
@@ -214,3 +220,4 @@ | ||
```js | ||
consumer.close(true); | ||
consumer.close(true, cb); | ||
consuemr.close(cb); //force is force | ||
``` | ||
@@ -220,3 +227,3 @@ | ||
### Offset(client) | ||
* `client`: client which keep connect with kafka server. | ||
* `client`: client which keeps a connection with the Kafka server. | ||
@@ -278,3 +285,3 @@ ### fetch(payloads, cb) | ||
### fetchcommits(groupid, payloads, cb) | ||
### fetchCommits(groupid, payloads, cb) | ||
Fetch the last committed offset in a topic of a specific consumer group | ||
@@ -281,0 +288,0 @@ |
@@ -150,2 +150,3 @@ 'use strict'; | ||
var client = new Client(); | ||
var consumer = new Consumer(client, topics, options); | ||
@@ -219,2 +220,31 @@ var count = 0; | ||
}); | ||
describe('#close', function () { | ||
it('should close the consumer', function (done) { | ||
var client = new Client(), | ||
topics = [ { topic: '_exist_topic_2_test' } ], | ||
options = { autoCommit: false, groupId: '_groupId_close_test' }; | ||
var consumer = new Consumer(client, topics, options); | ||
consumer.once('message', function (message) { | ||
consumer.close(function (err) { | ||
done(err); | ||
}); | ||
}); | ||
}); | ||
it('should commit the offset if force', function (done) { | ||
var client = new Client(), | ||
topics = [ { topic: '_exist_topic_2_test' } ], | ||
force = true, | ||
options = { autoCommit: false, groupId: '_groupId_close_test' }; | ||
var consumer = new Consumer(client, topics, options); | ||
consumer.once('message', function (message) { | ||
consumer.close(force, function (err) { | ||
done(err); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
78764
28
1856
326