![Build Status](https://travis-ci.org/oleksiyk/kafka.png)
no-kafka
no-kafka is Apache Kafka 0.9 client for Node.js with new unified consumer API support. No Zookeeper connection required. Doesn't support compression yet.
All methods will return a promise
Using
- download and install Kafka
- create your test topic:
kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic kafka-test-topic --partitions 3 --replication-factor 1
Producer
Example:
var Kafka = require('no-kafka');
var producer = new Kafka.Producer();
return producer.init().then(function(){
return producer.send({
topic: 'kafka-test-topic',
partition: 0,
message: {
value: 'Hello!'
}
});
})
.then(function (result) {
});
Producer options:
- requiredAcks - require acknoledgments for produce request. If it is 0 the server will not send any response. If it is 1 (default), the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas).
- timeout - timeout in ms for produce request
- clientId - ID of this client, defaults to 'no-kafka-client'
- connectionString - comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'
Simple Consumer
Manually specify topic, partition and offset when subscribing. Suitable for simple use cases.
Example:
var consumer = new Kafka.SimpleConsumer();
consumer.on('data', function (messageSet, topic, partition) {
messageSet.forEach(function (m) {
console.log(topic, partition, m.offset, m.message.value.toString('utf8'));
});
});
return consumer.init().then(function () {
return Promise.all([
consumer.subscribe('kafka-test-topic', 0),
consumer.subscribe('kafka-test-topic', 1)
]);
});
Subscribe (or change subscription) to specific offset and limit maximum received MessageSet size:
consumer.subscribe('kafka-test-topic', 0, {offset: 20, maxBytes: 30}
Subscribe to latest or earliest offsets in the topic/parition:
consumer.subscribe('kafka-test-topic', 0, {time: Kafka.LATEST_OFFSET}
consumer.subscribe('kafka-test-topic', 0, {time: Kafka.EARLIEST_OFFSET}
Commit offset(s) (V0, Kafka saves these commits to Zookeeper)
consumer.commitOffset([
{
topic: 'kafka-test-topic',
partition: 0,
offset: 1
},
{
topic: 'kafka-test-topic',
partition: 1,
offset: 2
}
])
Fetch commited offset(s)
consumer.fetchOffset([
{
topic: 'kafka-test-topic',
partition: 0
},
{
topic: 'kafka-test-topic',
partition: 1
}
]).then(function (result) {
});
Simple Consumer options
- groupId - group ID for comitting and fetching offsets. Defaults to 'no-kafka-group-v0'
- timeout - timeout for fetch requests, defaults to 100ms
- idleTimeout - timeout between fetch calls, defaults to 1000ms
- minBytes - minimum number of bytes to wait from Kafka before returning the fetch call, defaults to 1 byte
- maxBytes - maximum size of messages in a fetch response
- clientId - ID of this client, defaults to 'no-kafka-client'
- connectionString - comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'
Group Consumer (new unified consumer API)
Specify an assignment strategy (or use no-kafka built-in consistent assignment strategy) and subscribe by specifying only topics. Elected group leader will automatically assign partitions between all group members.
Example:
var consumer = new Kafka.GroupConsumer();
var strategies = [{
strategy: 'TestStrategy',
subscriptions: ['kafka-test-topic'],
metadata: {
id: process.argv[2] || 'consumer_1',
weight: 50
}
}];
consumer.on('data', function (messageSet, topic, partition) {
messageSet.forEach(function (m) {
console.log(topic, partition, m.offset, m.message.value.toString('utf8'));
consumer.commitOffset({topic: topic, partition: partition, offset: m.offset, metadata: 'optional'});
});
});
return consumer.init(strategies).then(function(){
});
Assignment strategies
no-kafka provides two built-in strategies:
GroupConsumer.ConsistentAssignment
which is based on a consisten hash ring and so provides consistent assignment across consumers in a group based on supplied metadata.id
and metadata.weight
options.GroupConsumer.RoundRobinAssignment
simple range assignment.
Using GroupConsumer.ConsistentAssignment
(default in no-kafka):
var strategies = {
strategy: 'TestStrategy',
subscriptions: ['kafka-test-topic'],
metadata: {
id: process.argv[2] || 'consumer_1',
weight: 50
}
};
Note that each consumer in a group should have its own and consistent metadata.id.
Using GroupConsumer.RoundRobinAssignment
:
var strategies = {
strategy: 'TestStrategy',
subscriptions: ['kafka-test-topic'],
fn: Kafka.GroupConsumer.RoundRobinAssignment
};
You can also write your own assignment strategy function and provide it as fn
options of the strategy item.