What is kafkajs?
KafkaJS is a modern Apache Kafka client for Node.js. It is designed to be simple, reliable, and performant, making it easy to interact with Kafka brokers and manage Kafka topics, producers, and consumers.
What are kafkajs's main functionalities?
Producer
This code sample demonstrates how to create a Kafka producer using KafkaJS. The producer connects to the Kafka broker, sends a message to a specified topic, and then disconnects.
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092']
});
const producer = kafka.producer();
const run = async () => {
await producer.connect();
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'Hello KafkaJS user!' }
]
});
await producer.disconnect();
};
run().catch(console.error);
Consumer
This code sample demonstrates how to create a Kafka consumer using KafkaJS. The consumer connects to the Kafka broker, subscribes to a specified topic, and logs each message received.
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092']
});
const consumer = kafka.consumer({ groupId: 'test-group' });
const run = async () => {
await consumer.connect();
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
});
},
});
};
run().catch(console.error);
Admin
This code sample demonstrates how to use the admin client in KafkaJS to create a new topic. The admin client connects to the Kafka broker, creates a topic with specified configurations, and then disconnects.
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092']
});
const admin = kafka.admin();
const run = async () => {
await admin.connect();
await admin.createTopics({
topics: [
{ topic: 'test-topic', numPartitions: 1 }
]
});
await admin.disconnect();
};
run().catch(console.error);
Other packages similar to kafkajs
node-rdkafka
node-rdkafka is a high-performance Node.js client for Apache Kafka based on the C/C++ library librdkafka. It offers more advanced features and better performance compared to KafkaJS but can be more complex to set up and use.
kafka-node
kafka-node is another popular Kafka client for Node.js. It is simpler and easier to use compared to node-rdkafka but may not offer the same level of performance and advanced features as node-rdkafka or KafkaJS.
no-kafka
no-kafka is a pure JavaScript client for Apache Kafka. It is designed to be simple and easy to use, similar to KafkaJS, but it may not be as actively maintained or feature-rich as KafkaJS.

KafkaJS
A modern Apache Kafka client for node.js
In active development - early alpha
- Fully working producer compatible with 0.10.x (0.9.x will be possible soon)
- Fully working consumer groups compatible with 0.10.x (0.9.x will be possible soon)
- GZIP compression
- Plain, SSL and SASL_SSL implementations
Usage
Setting up the Client
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092']
})
Producing Messages to Kafka
const producer = kafka.producer()
async () => {
await producer.connect()
await producer.send({
topic: 'topic-name',
messages: [
{ key: 'key1', value: 'hello world' },
{ key: 'key2', value: 'hey hey!' }
],
})
await producer.disconnect()
}
Consuming messages with consumer groups
const consumer = kafka.consumer({ groupId: 'my-group' })
async () => {
await consumer.connect()
await consumer.subscribe({ topic: 'topic-name' })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key.toString(),
value: message.value.toString()
})
},
})
await consumer.disconnect()
}
it's also possible to consume the batch instead of each message, example:
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
for (let message of batch.messages) {
console.log({
topic: batch.topic,
partition: batch.partition,
highWatermark: batch.highWatermark,
message: {
offset: message.offset,
key: message.key.toString(),
value: message.value.toString()
}
})
resolveOffset(message.offset)
}
},
})
Configure SSL and SASL
const fs = require('fs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092']
ssl: {
cert: fs.readFileSync('<path/to>/client_cert.pem', 'utf-8'),
key: fs.readFileSync('<path/to>/client_key.pem', 'utf-8'),
ca: [fs.readFileSync('<path/to>/ca_cert.pem', 'utf-8')],
},
sasl: {
mechanism: 'plain',
username: 'my-username',
password: 'my-password',
},
})
Development
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
http://kafka.apache.org/protocol.html
yarn test
or
./scripts/dockerComposeUp.sh
yarn test:local
Password for test keystore and certificates: testtest
Password for SASL test:testtest