
KafkaJS

A modern Apache Kafka client for node.js. This library is compatible with Kafka 0.10+
.
KafkaJS is battle-tested and ready for production.
Features
- Producer
- Consumer groups with pause, resume, and seek
- GZIP compression
- Plain, SSL and SASL_SSL implementations
Table of Contents
Installation
npm install kafkajs
Usage
Setting up the Client
The client must be configured with at least one broker. The brokers on the list are considered seed brokers and are only used to bootstrap the client and load initial metadata.
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092']
})
SSL
The ssl
option can be used to configure the TLS sockets. The options are passed directly to tls.connect
and used to create the TLS Secure Context, all options are accepted.
const fs = require('fs')
new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
ssl: {
rejectUnauthorized: false,
ca: [fs.readFileSync('/my/custom/ca.crt', 'utf-8')],
key: fs.readFileSync('/my/custom/client-key.pem', 'utf-8'),
cert: fs.readFileSync('/my/custom/client-cert.pem', 'utf-8')
},
})
Take a look at TLS create secure context for more information. NODE_EXTRA_CA_CERTS
can be used to add custom CAs. Use ssl: true
if you don't have any extra configurations and want to enable SSL.
SASL
Kafka has support for using SASL to authenticate clients. The sasl
option can be used to configure the authentication mechanism. Currently, KafkaJS only supports the PLAIN
mechanism.
new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
sasl: {
mechanism: 'plain',
username: 'my-username',
password: 'my-password'
},
})
It is highly recommended that you use SSL for encryption when using PLAIN
.
Connection Timeout
Time in milliseconds to wait for a successful connection. The default value is: 1000
.
new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
connectionTimeout: 3000
})
Default Retry
The retry
option can be used to set the default configuration. The retry mechanism uses a randomization function that grows exponentially. The configuration will be used to retry connections and API calls to Kafka (when using producers or consumers).
If the max number of retries is exceeded the retrier will throw KafkaJSNumberOfRetriesExceeded
and interrupt. Producers will bubble up the error to the user code; Consumers will wait the retry time attached to the exception (it will be based on the number of attempts) and perform a full restart.
Available options:
option | description | default |
---|
maxRetryTime | Maximum wait time for a retry in milliseconds | 30000 |
initialRetryTime | Initial value used to calculate the retry in milliseconds (This is still randomized following the randomization factor) | 300 |
factor | Randomization factor | 0.2 |
multiplier | Exponential factor | 2 |
retries | Max number of retries per call | 5 |
Example:
new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
retry: {
initialRetryTime: 100,
retries: 8
}
})
Logger
KafkaJS has a built-in STDOUT
logger which outputs JSON. It also accepts a custom log creator which allows you to integrate your favorite logger library.
There are 5 log levels available: NOTHING
, ERROR
, WARN
, INFO
, and DEBUG
. INFO
is configured by default.
How to configure the log level?
const { Kafka, logLevel } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
logLevel: logLevel.ERROR
})
The environment variable KAFKAJS_LOG_LEVEL
can also be used and it has precedence over the configuration in code, example:
KAFKAJS_LOG_LEVEL=info node code.js
NOTE: for more information on how to customize your logs, take a look at Custom logger
Producing Messages to Kafka
To publish messages to Kafka you have to create a producer; with a client in hand you just have to call the producer
function, for example:
const producer = kafka.producer()
By default, the producer is configured to distribute the messages with the following logic:
- If a partition is specified in the message, use it
- If no partition is specified but a key is present choose a partition based on a hash (murmur2) of the key
- If no partition or key is present choose a partition in a round-robin fashion
The method send
is used to publish messages to the Kafka cluster.
const producer = kafka.producer()
async () => {
await producer.connect()
await producer.send({
topic: 'topic-name',
messages: [
{ key: 'key1', value: 'hello world' },
{ key: 'key2', value: 'hey hey!' }
],
})
await producer.disconnect()
}
Example with a defined partition:
async () => {
await producer.send({
topic: 'topic-name',
messages: [
{ key: 'key1', value: 'hello world', partition: 0 },
{ key: 'key2', value: 'hey hey!', partition: 1 }
],
})
}
The method send
has the following signature:
await producer.send({
topic: <String>,
messages: <Message[]>,
acks: <Number>,
timeout: <Number>,
compression: <CompressionTypes>,
})
property | description |
---|
topic | topic name |
messages | An array of objects with "key" and "value", example: [{ key: 'my-key', value: 'my-value'}] |
acks | Control the number of required acks. -1 = all replicas must acknowledge (default) 0 = no acknowledgments 1 = only waits for the leader to acknowledge |
timeout | The time to await a response in ms. Default value 30000 |
compression | Compression codec. Default value CompressionTypes.None |
Compression
KafkaJS only supports GZIP natively; the library aims to have a small footprint and as little dependencies as possible. The remaining codecs can be easily implemented using existing libraries. Plugins providing other codecs might exist in the future, but there are no plans to implement them shortly.
GZIP
const { CompressionTypes } = require('kafkajs')
async () => {
await producer.send({
topic: 'topic-name',
compression: CompressionTypes.GZIP,
messages: [
{ key: 'key1', value: 'hello world' },
{ key: 'key2', value: 'hey hey!' }
],
})
}
The consumers know how to decompress GZIP, so no further work is necessary.
Adding Snappy or LZ4 codecs
A codec is an object with two async
functions: compress
and decompress
.
Example using the snappy package:
const { promisify } = require('util')
const snappy = require('snappy')
const snappyCompress = promisify(snappy.compress)
const snappyDecompress = promisify(snappy.uncompress)
const SnappyCodec = {
async compress(encoder) {
return snappyCompress(encoder.buffer)
},
async decompress(buffer) {
return snappyDecompress(buffer)
}
}
Then, to add this implementation:
const { CompressionTypes, CompressionCodecs } = require('kafkajs')
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec
The new codec can now be used with the send
method, example:
async () => {
await producer.send({
topic: 'topic-name',
compression: CompressionTypes.Snappy,
messages: [
{ key: 'key1', value: 'hello world' },
{ key: 'key2', value: 'hey hey!' }
],
})
}
Custom partitioner
It's possible to assign a custom partitioner to the producer. A partitioner is a function which returns another function responsible for the partition selection, something like this:
const MyPartitioner = () => {
return ({ topic, partitionMetadata, message }) => {
return 0
}
}
partitionMetadata
is an array of partitions with the following structure:
{ partitionId: <NodeId>, leader: <NodeId> }
Example:
[
{ partitionId: 1, leader: 1 },
{ partitionId: 2, leader: 2 },
{ partitionId: 0, leader: 0 }
]
To Configure your partitioner use the option createPartitioner
.
kafka.producer({ createPartitioner: MyPartitioner })
Retry
The option retry
can be used to customize the configuration for the producer.
Take a look at Retry for more information.
Consuming messages from Kafka
Consumer groups allow a group of machines or processes to coordinate access to a list of topics, distributing the load among the consumers. When a consumer fails the load is automatically distributed to other members of the group. Consumer groups must have unique group ids.
Creating the consumer:
const consumer = kafka.consumer({ groupId: 'my-group' })
Subscribing to some topics:
async () => {
await consumer.connect()
await consumer.subscribe({ topic: 'topic-A' })
await consumer.subscribe({ topic: 'topic-B' })
}
KafkaJS offers you two ways to process your data: eachMessage
and eachBatch
eachMessage
This handler provides a convenient API, feeding your function one message at a time. The handler will automatically commit your offsets and heartbeat at the configured interval. If you are just looking to get started with Kafka consumers this should be your first solution.
async () => {
await consumer.connect()
await consumer.subscribe({ topic: 'topic-name' })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key.toString(),
value: message.value.toString()
})
},
})
await consumer.disconnect()
}
eachBatch
Some use cases can be optimized by dealing with batches rather than single messages. This handler will feed your function batches and some utility functions to give your code more flexibility. Be aware that using eachBatch
is considered a more advanced use case since you will have to understand how session timeouts and heartbeats are connected. All resolved offsets will be automatically committed after the function is executed.
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning }) => {
for (let message of batch.messages) {
console.log({
topic: batch.topic,
partition: batch.partition,
highWatermark: batch.highWatermark,
message: {
offset: message.offset,
key: message.key.toString(),
value: message.value.toString()
}
})
await resolveOffset(message.offset)
await heartbeat()
}
},
})
await consumer.disconnect()
highWatermark
is the last committed offset within the topic partition. It can be useful for calculating lag.
resolveOffset
is used to mark the message as processed. In case of errors, the consumer will automatically commit the resolved offsets. With the default configuration, the function can't be interrupted without ignoring the unprocessed message; this happens because after the function is executed the last offset of the batch is automatically resolved and committed. To have a fine grain control of message processing it's possible to disable the auto-resolve, setting the property eachBatchAutoResolve
to false. Example:
consumer.run({
eachBatchAutoResolve: false,
eachBatch: ({ batch, resolveOffset, heartbeat, isRunning }) => {
for (let message of batch.messages) {
if (!isRunning()) break
await processMessage(message)
await resolveOffset(message.offset)
await heartbeat()
}
}
})
In this example, if the consumer is shutting down in the middle of the batch, the remaining messages won't be resolved and therefore not committed.
Options
kafka.consumer({
groupId: <String>,
partitionAssigners: <Array>,
sessionTimeout: <Number>,
heartbeatInterval: <Number>,
maxBytesPerPartition: <Number>,
minBytes: <Number>,
maxBytes: <Number>,
maxWaitTimeInMs: <Number>,
retry: <Object>,
})
option | description | default |
---|
partitionAssigners | List of partition assigners | [PartitionAssigners.roundRobin] |
sessionTimeout | Timeout in milliseconds used to detect failures. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance | 30000 |
heartbeatInterval | The expected time in milliseconds between heartbeats to the consumer coordinator. Heartbeats are used to ensure that the consumer's session stays active. The value must be set lower than session timeout | 3000 |
maxBytesPerPartition | The maximum amount of data per-partition the server will return. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition | 1048576 (1MB) |
minBytes | Minimum amount of data the server should return for a fetch request, otherwise wait up to maxWaitTimeInMs for more data to accumulate. default: 1 | |
maxBytes | Maximum amount of bytes to accumulate in the response. Supported by Kafka >= 0.10.1.0 | 10485760 (10MB) |
maxWaitTimeInMs | The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by minBytes | 5000 |
retry | Take a look at Retry for more information\ | { retries: 10 } |
Pause & Resume
In order to pause and resume consuming from one or more topics, the Consumer
provides the methods pause
and resume
. Note that pausing a topic means that it won't be fetched in the next cycle. You may still receive messages for the topic within the current batch.
await consumer.connect()
await consumer.subscribe({ topic: 'jobs' })
await consumer.subscribe({ topic: 'pause' })
await consumer.subscribe({ topic: 'resume' })
await consumer.run({ eachMessage: async ({ topic, message }) => {
switch(topic) {
case 'jobs':
doSomeWork(message)
break
case 'pause':
consumer.pause([{ topic: 'jobs'}])
break
case 'resume':
consumer.resume([{ topic: 'jobs' }])
break
}
}})
Calling pause
with a topic that the consumer is not subscribed to is a no-op, as is calling resume
with a topic that is not paused.
Seek
To move the offset position in a topic/partition the Consumer
provides the method seek
. This method has to be called after the consumer is initialized and is running (after consumer#run).
await consumer.connect()
await consumer.subscribe({ topic: 'example' })
consumer.run({ eachMessage: async ({ topic, message }) => true })
consumer.seek({ topic: 'example', partition: 0, offset: 12384 })
Custom partition assigner
It's possible to configure the strategy the consumer will use to distribute partitions amongst the consumer group. KafkaJS has a round robin assigner configured by default.
A partition assigner is a function which returns an object with the following interface:
const MyPartitionAssigner = ({ cluster }) => ({
name: 'MyPartitionAssigner',
version: 1,
async assign({ members, topics }) {},
protocol({ topics }) {}
})
The method assign
has to return an assignment plan with partitions per topic. A partition plan consists of a list of memberId
and memberAssignment
. The member assignment has to be encoded, use the MemberAssignment
utility for that. Example:
const { AssignerProtocol: { MemberAssignment } } = require('kafkajs')
const MyPartitionAssigner = ({ cluster }) => ({
version: 1,
async assign({ members, topics }) {
return myCustomAssignmentArray.map(memberId => ({
memberId,
memberAssignment: MemberAssignment.encode({
version: this.version,
assignment: assignment[memberId],
})
}))
}
})
The method protocol
has to return name
and metadata
. Metadata has to be encoded, use the MemberMetadata
utility for that. Example:
const { AssignerProtocol: { MemberMetadata } } = require('kafkajs')
const MyPartitionAssigner = ({ cluster }) => ({
name: 'MyPartitionAssigner',
version: 1,
protocol({ topics }) {
return {
name: this.name,
metadata: MemberMetadata.encode({
version: this.version,
topics,
}),
}
}
})
Your protocol
method will probably look like the example but it's not implemented by default because extra data can be included as userData
, take a look at the MemberMetadata#encode
for more information.
Once your assigner is done add it to the list of assigners, it's important to keep the default assigner there to allow the old consumers to have a common ground with the new consumers when deploying.
const { PartitionAssigners: { roundRobin } } = require('kafkajs')
kafka.consumer({
groupId: 'my-group',
partitionAssigners: [
MyPartitionAssigner,
roundRobin
]
})
Describe group
Experimental - This feature may be removed or changed in new versions of KafkaJS
Returns metadata for the configured consumer group, example:
const data = await consumer.describeGroup()
Instrumentation
Experimental - This feature may be removed or changed in new versions of KafkaJS
Some operations are instrumented using the EventEmitter
. To receive the events use the method consumer#on
, example:
const { HEARTBEAT } = consumer.events
const removeListener = consumer.on(HEARTBEAT, e => console.log(`heartbeat at ${e.timestamp}`))
The listeners are always async, even when using regular functions. The consumer will never block when executing your listeners. Errors in the listeners won't affect the consumer.
List of available events:
- consumer.events.HEARTBEAT
- consumer.events.COMMIT_OFFSETS
Instrumentation Event:
{
id: <Number>,
type: <String>,
timestamp: <Number>,
payload: <Object>
}
Custom logger
The logger is customized using log creators. A log creator is a function which receives a log level and returns a log function. The log function receives namespace, level, label, and log.
namespace
identifies the component which is performing the log, for example, connection or consumer.
level
is the log level of the log entry.
label
is a text representation of the log level, example: 'INFO'.
log
is an object with the following keys: timestamp
, logger
, message
, and the extra keys given by the user. (logger.info('test', { extra_data: true })
)
{
level: 4,
label: 'INFO',
timestamp: '2017-12-29T13:39:54.575Z',
logger: 'kafkajs',
message: 'Started',
}
The general structure looks like this:
const MyLogCreator = logLevel => ({ namespace, level, label, log }) => {
}
Example using winston:
const { logLevel } = require('kafkajs')
const winston = require('winston')
const toWinstonLogLevel = level => switch(level) {
case logLevel.ERROR:
case logLevel.NOTHING:
return 'error'
case logLevel.WARN:
return 'warn'
case logLevel.INFO:
return 'info'
case logLevel.DEBUG:
return 'debug'
}
const WinstonLogCreator = logLevel => {
const logger = winston.createLogger({
level: toWinstonLogLevel(logLevel),
transports: [
new winston.transports.Console(),
new winston.transports.File({ filename: 'myapp.log' })
]
})
return ({ namespace, level, { message, ...extra } }) => {
logger.log({
level: toWinstonLogLevel(level),
message,
extra,
})
}
}
Once you have your log creator you can use the logCreator
option to configure the client:
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
logLevel: logLevel.ERROR,
logCreator: WinstonLogCreator
})
Development
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
https://kafka.apache.org/protocol.html
yarn test
or
./scripts/dockerComposeUp.sh
yarn test:local
Password for test keystore and certificates: testtest
Password for SASL test:testtest
Acknowledgements
Thanks to Sebastian Norde for the awesome logo!
License
See LICENSE for more details.