no-kafka
no-kafka is Apache Kafka 0.9 client for Node.js with new unified consumer API support.
All methods will return a promise
Using
kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic kafka-test-topic --partitions 3 --replication-factor 1
npm install no-kafka
Producer
Example:
var Kafka = require('no-kafka');
var producer = new Kafka.Producer();
return producer.init().then(function(){
return producer.send({
topic: 'kafka-test-topic',
partition: 0,
message: {
value: 'Hello!'
}
});
})
.then(function (result) {
});
Send and retry if failed 2 times with 100ms delay:
return producer.send(messages, {
retries: {
attempts: 2,
delay: 100
}
});
Accumulate messages into single batch until their total size is >= 1024 bytes or 100ms timeout expires (overwrite Producer constructor options):
producer.send(messages, {
batch: {
size: 1024,
maxWait: 100
}
});
producer.send(messages, {
batch: {
size: 1024,
maxWait: 100
}
});
Please note, that if you pass different options to the send()
method then these messages will be grouped into separate batches:
producer.send(messages, {
batch: {
size: 1024,
maxWait: 100
},
codec: Kafka.COMPRESSION_GZIP
});
producer.send(messages, {
batch: {
size: 1024,
maxWait: 100
},
codec: Kafka.COMPRESSION_SNAPPY
});
Send message with Snappy compression:
return producer.send(messages, { codec: Kafka.COMPRESSION_SNAPPY });
Producer options:
requiredAcks
- require acknoledgments for produce request. If it is 0 the server will not send any response. If it is 1 (default), the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas).timeout
- timeout in ms for produce requestclientId
- ID of this client, defaults to 'no-kafka-client'connectionString
- comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'partitioner
- function used to determine topic partition for message. If message already specifies a partition, the partitioner won't be used. The partitioner function receives 3 arguments: the topic name, an array with topic partitions, and the message (useful to partition by key, etc.). partitioner
can be sync or async (return a Promise).retries
- controls number of attempts at delay between them when produce request fails
attempts
- number of total attempts to send the message, defaults to 3delay
- delay in ms between retries, defaults to 1000
codec
- compression codec, one of Kafka.COMPRESSION_NONE, Kafka.COMPRESSION_SNAPPY, Kafka.COMPRESSION_GZIPbatch
- control batching (grouping) of requests
size
- group messages together into single batch until their total size exceeds this value, defaults to 16384 bytes. Set to 0 to disable batching.maxWait
- send grouped messages after this amount of milliseconds expire even if their total size doesn't exceed batch.size
yet, defaults to 10ms. Set to 0 to disable batching.
asyncCompression
- boolean, use asynchronouse compression instead of synchronous, defaults to false
SimpleConsumer
Manually specify topic, partition and offset when subscribing. Suitable for simple use cases.
Example:
var consumer = new Kafka.SimpleConsumer();
var dataHandler = function (messageSet, topic, partition) {
messageSet.forEach(function (m) {
console.log(topic, partition, m.offset, m.message.value.toString('utf8'));
});
};
return consumer.init().then(function () {
return consumer.subscribe('kafka-test-topic', [0, 1], dataHandler);
});
Subscribe (or change subscription) to specific offset and limit maximum received MessageSet size:
consumer.subscribe('kafka-test-topic', 0, {offset: 20, maxBytes: 30}, dataHandler)
Subscribe to latest or earliest offsets in the topic/parition:
consumer.subscribe('kafka-test-topic', 0, {time: Kafka.LATEST_OFFSET}, dataHandler)
consumer.subscribe('kafka-test-topic', 0, {time: Kafka.EARLIEST_OFFSET}, dataHandler)
Subscribe to all partitions in a topic:
consumer.subscribe('kafka-test-topic', dataHandler)
Commit offset(s) (V0, Kafka saves these commits to Zookeeper)
consumer.commitOffset([
{
topic: 'kafka-test-topic',
partition: 0,
offset: 1
},
{
topic: 'kafka-test-topic',
partition: 1,
offset: 2
}
])
Fetch commited offset(s)
consumer.fetchOffset([
{
topic: 'kafka-test-topic',
partition: 0
},
{
topic: 'kafka-test-topic',
partition: 1
}
]).then(function (result) {
});
SimpleConsumer options
groupId
- group ID for comitting and fetching offsets. Defaults to 'no-kafka-group-v0'timeout
- timeout for fetch requests, defaults to 100msidleTimeout
- timeout between fetch calls, defaults to 1000msminBytes
- minimum number of bytes to wait from Kafka before returning the fetch call, defaults to 1 bytemaxBytes
- maximum size of messages in a fetch responseclientId
- ID of this client, defaults to 'no-kafka-client'connectionString
- comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'recoveryOffset
- recovery position (time) which will used to recover subscription in case of OffsetOutOfRange error, defaults to Kafka.LATEST_OFFSETasyncCompression
- boolean, use asynchronouse decompression instead of synchronous, defaults to false
GroupConsumer (new unified consumer API)
Specify an assignment strategy (or use no-kafka built-in consistent or round robin assignment strategy) and subscribe by specifying only topics. Elected group leader will automatically assign partitions between all group members.
Example:
var Promise = require('bluebird');
var consumer = new Kafka.GroupConsumer();
var dataHandler = function (messageSet, topic, partition) {
return Promise.map(messageSet, function (m){
console.log(topic, partition, m.offset, m.message.value.toString('utf8'));
return consumer.commitOffset({topic: topic, partition: partition, offset: m.offset, metadata: 'optional'});
});
};
var strategies = [{
strategy: 'TestStrategy',
subscriptions: ['kafka-test-topic'],
handler: dataHandler
}];
consumer.init(strategies);
Assignment strategies
no-kafka provides three built-in strategies:
Kafka.WeightedRoundRobinAssignment
weighted round robin assignment (based on wrr-pool).Kafka.ConsistentAssignment
which is based on a consistent hash ring and so provides consistent assignment across consumers in a group based on supplied metadata.id
and metadata.weight
options.Kafka.RoundRobinAssignment
simple assignment strategy (default).
Using Kafka.WeightedRoundRobinAssignment
:
var strategies = {
strategy: 'TestStrategy',
subscriptions: ['kafka-test-topic'],
metadata: {
weight: 4
},
fn: Kafka.WeightedRoundRobinAssignment,
handler: dataHandler
};
Using Kafka.ConsistentAssignment
:
var strategies = {
strategy: 'TestStrategy',
subscriptions: ['kafka-test-topic'],
metadata: {
id: process.argv[2] || 'consumer_1',
weight: 50
},
fn: Kafka.ConsistentAssignment,
handler: dataHandler
};
Note that each consumer in a group should have its own and consistent metadata.id.
Using Kafka.RoundRobinAssignment
(default in no-kafka):
var strategies = {
strategy: 'TestStrategy',
subscriptions: ['kafka-test-topic'],
handler: dataHandler
};
You can also write your own assignment strategy function and provide it as fn
option of the strategy item.
GroupConsumer options
groupId
- group ID for comitting and fetching offsets. Defaults to 'no-kafka-group-v0.9'timeout
- timeout for fetch requests, defaults to 100msidleTimeout
- timeout between fetch calls, defaults to 1000msminBytes
- minimum number of bytes to wait from Kafka before returning the fetch call, defaults to 1 bytemaxBytes
- maximum size of messages in a fetch responseclientId
- ID of this client, defaults to 'no-kafka-client'connectionString
- comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'sessionTimeout
- session timeout in ms, min 6000, max 30000, defaults to 15000
heartbeatTimeout
- delay between heartbeat requests in ms, defaults to 1000
retentionTime
- offset retention time in ms, defaults to 1 day (24 * 3600 * 1000)startingOffset
- starting position (time) when there is no commited offset, defaults to Kafka.LATEST_OFFSET
recoveryOffset
- recovery position (time) which will used to recover subscription in case of OffsetOutOfRange error, defaults to Kafka.LATEST_OFFSETasyncCompression
- boolean, use asynchronouse decompression instead of synchronous, defaults to false
GroupAdmin (consumer groups API)
Offes two methods:
listGroups
- list existing consumer groupsdescribeGroup
- describe existing group by its id
Example:
var admin = new Kafka.GroupAdmin();
return admin.init().then(function(){
return admin.listGroups().then(function(groups){
return admin.describeGroup('no-kafka-admin-test-group').then(function(group){
})
});
});
Compression
no-kafka supports both SNAPPY and Gzip compression.
Enable compression in Producer:
var Kafka = require('no-kafka');
var producer = new Kafka.Producer({
clientId: 'producer',
codec: Kafka.COMPRESSION_SNAPPY
});
Alternatively just send some messages with specified compression codec (overwrites codec set in contructor):
return producer.send({
topic: 'kafka-test-topic',
partition: 0,
message: { value: 'p00' }
}, { codec: Kafka.COMPRESSION_SNAPPY })
By default no-kafka will use synchronous compression and decompression (synchronous Gzip is not availble in node < 0.11).
Enable async compression/decompression with asyncCompression
options:
Producer:
var producer = new Kafka.Producer({
clientId: 'producer',
asyncCompression: true,
codec: Kafka.COMPRESSION_SNAPPY
});
Consumer:
var consumer = new Kafka.SimpleConsumer({
idleTimeout: 100,
clientId: 'simple-consumer',
asyncCompression: true
});
Logging
You can differentiate messages from several instances of producer/consumer by providing unique clientId
in options:
var consumer1 = new Kafka.GroupConsumer({
clientId: 'group-consumer-1'
});
var consumer2 = new Kafka.GroupConsumer({
clientId: 'group-consumer-2'
});
=>
2016-01-12T07:41:57.884Z INFO group-consumer-1 ....
2016-01-12T07:41:57.884Z INFO group-consumer-2 ....
Change the logging level:
var consumer = new Kafka.GroupConsumer({
clientId: 'group-consumer',
logger: {
logLevel: 1
}
});
Send log messages to Logstash server(s) via UDP:
var consumer = new Kafka.GroupConsumer({
clientId: 'group-consumer',
logger: {
logstash: {
enabled: true,
connectionString: '10.0.1.1:9999,10.0.1.2:9999',
app: 'myApp-kafka-consumer'
}
}
});
You can overwrite the function that outputs messages to stdout/stderr:
var consumer = new Kafka.GroupConsumer({
clientId: 'group-consumer',
logger: {
logFunction: console.log
}
});
License: MIT