kafka-avro
Node.js bindings for librdkafka with Avro schema serialization.
The kafka-avro library is a wrapper that combines the node-rdkafka and avsc libraries to allow for Production and Consumption of messages on kafka validated and serialized by Avro.
Install
Install the module using NPM:
npm install kafka-avro --save
Documentation
The kafka-avro library operates in the following steps:
- You provide your Kafka Brokers and Schema Registry (SR) Url to a new instance of kafka-avro.
- You initialize kafka-avro, that will tell the library to query the SR for all registered schemas, evaluate and store them in runtime memory.
- kafka-avro will then expose the
getConsumer()
and getProducer()
methods, which both return instsances of the corresponding Constructors from the node-rdkafka library.
The instances of "node-rdkafka" that are returned by kafka-avro are hacked so as to intercept produced and consumed messages and run them by the Avro de/serializer.
You are highly encouraged to read the "node-rdkafka" documentation on the way you can use the Producer and Consumer instances as well as check out the available configuration options of node-rdkafka
Initialize kafka-avro
var KafkaAvro = require('kafka-avro');
var kafkaAvro = new KafkaAvro({
kafkaBroker: 'localhost:9092',
schemaRegistry: 'localhost:8081',
});
kafkaAvro.on('log', function(message) {
console.log(message);
})
kafkaAvro.init()
.then(function() {
console.log('Ready to use');
});
Quick Usage Producer
NOTICE: You need to initialize kafka-avro before you can produce or consume messages.
var producer = kafkaAvro.getProducer({
});
var topicName = 'test';
producer.on('ready', function() {
var topic = producer.Topic(topicName, {
'request.required.acks': 1
});
var value = new Buffer('value-' +i);
var key = 'key';
var partition = -1;
producer.produce(topic, partition, value, key);
});
producer.on('disconnected', function(arg) {
console.log('producer disconnected. ' + JSON.stringify(arg));
});
producer.connect();
What kafka-avro basically does is wrap around node-rdkafka and intercept the produce method to validate and serialize the message.
Quick Usage Consumer
NOTICE: You need to initialize kafka-avro before you can produce or consume messages.
var Transform = require('stream').Transform;
var consumer = kafkaAvro.getConsumer({
'group.id': 'librd-test',
'socket.keepalive.enable': true,
'enable.auto.commit': true,
});
var topicName = 'test';
var stream = consumer.getReadStream(topicName, {
waitInterval: 0
});
stream.on('error', function() {
process.exit(1);
});
consumer.on('error', function(err) {
console.log(err);
process.exit(1);
});
stream.on('data', function(message) {
console.log('Received message:', message);
});
Same deal here, thin wrapper around node-rdkafka and deserialize incoming messages before they reach your consuming method.
Consumer Data Object
kafka-avro intercepts all incoming messages and augments the object with one more property named parsed
which contained the avro deserialized object. Here is a breakdown of the properties included in the message
object you receive when consuming messages:
value
Buffer The raw message buffer from Kafka.size
Number The size of the message.key
String|Number Partioning key used.topic
String The topic this message comes from.offset
Number The Kafka offset.partition
Number The kafka partion used.parsed
Object The avro deserialized message as a JS Object ltieral.
Releasing
- Update the changelog bellow.
- Ensure you are on master.
- Type:
grunt release
grunt release:minor
for minor number jump.grunt release:major
for major number jump.
Release History
- v0.1.0, 26 Jan 2016
- First fully working release.
- v0.0.1, 25 Jan 2016
License
Copyright Waldo, Inc. All rights reserved.