kafka-avro
Node.js bindings for librdkafka with Avro schema serialization.
The kafka-avro library is a wrapper that combines the node-rdkafka and avsc libraries to allow for Production and Consumption of messages on kafka validated and serialized by Avro.
Install
Install the module using NPM:
npm install kafka-avro --save
Documentation
The kafka-avro library operates in the following steps:
- You provide your Kafka Brokers and Schema Registry (SR) Url to a new instance of kafka-avro.
- You initialize kafka-avro, that will tell the library to query the SR for all registered schemas, evaluate and store them in runtime memory.
- kafka-avro will then expose the
getConsumer()
and getProducer()
methods, which both return instances of the corresponding Constructors from the node-rdkafka library.
The instances of "node-rdkafka" that are returned by kafka-avro are hacked so as to intercept produced and consumed messages and run them by the Avro de/serializer along with Confluent's Schema Registry Magic Byte and Schema Id.
You are highly encouraged to read the "node-rdkafka" documentation, as it explains how you can use the Producer and Consumer instances as well as check out the available configuration options of node-rdkafka.
node-rdkafka CODES
The Kafka.CODES
enumeration of constant values provided by the "node-rdkafka" library is also available as a static var at:
var KafkaAvro = require('kafka-avro');
console.log(KafkaAvro.CODES);
Initialize kafka-avro
var KafkaAvro = require('kafka-avro');
var kafkaAvro = new KafkaAvro({
kafkaBroker: 'localhost:9092',
schemaRegistry: 'localhost:8081',
});
kafkaAvro.init()
.then(function() {
console.log('Ready to use');
});
Kafka-avro options
When instantiating kafka-avro you may pass the following options:
kafkaBroker
String REQUIRED The url or comma delimited strings pointing to your kafka brokers.schemaRegistry
String REQUIRED The url to the Schema Registry.
Producer
NOTICE: You need to initialize kafka-avro before you can produce or consume messages.
By invoking the kafkaAvro.getProducer()
method, kafka-avro will instantiate a Producer, make it connect and wait for it to be ready before the promise is resolved.
kafkaAvro.getProducer({
})
.then(function(producer) {
var topicName = 'test';
producer.on('disconnected', function(arg) {
console.log('producer disconnected. ' + JSON.stringify(arg));
});
var topic = producer.Topic(topicName, {
'request.required.acks': 1
});
var value = new Buffer('value-' +i);
var key = 'key';
var partition = -1;
producer.produce(topic, partition, value, key);
})
What kafka-avro basically does is wrap around node-rdkafka and intercept the produce method to validate and serialize the message.
Consumer
NOTICE: You need to initialize kafka-avro before you can produce or consume messages.
By inoking the kafkaAvro.getConsumer()
method, kafka-avro will instantiate a Consumer, make it connect and wait for it to be ready before the promise is resolved.
Consumer using events to consume
kafkaAvro.getConsumer({
'group.id': 'librd-test',
'socket.keepalive.enable': true,
'enable.auto.commit': true,
})
.then(function(consumer) {
var topicName = 'test';
this.consumer.subscribe([topicName]);
this.consumer.consume();
this.consumer.on('data', function(rawData) {
console.log('data:', rawData);
});
});
Consumer using streams to consume
kafkaAvro.getConsumer({
'group.id': 'librd-test',
'socket.keepalive.enable': true,
'enable.auto.commit': true,
})
.then(function(consumer) {
var topicName = 'test';
var stream = consumer.getReadStream(topicName, {
waitInterval: 0
});
stream.on('error', function() {
process.exit(1);
});
consumer.on('error', function(err) {
console.log(err);
process.exit(1);
});
stream.on('data', function(message) {
console.log('Received message:', message);
});
});
Same deal here, thin wrapper around node-rdkafka and deserialize incoming messages before they reach your consuming method.
Consumer Data Object
kafka-avro intercepts all incoming messages and augments the object with one more property named parsed
which contained the avro deserialized object. Here is a breakdown of the properties included in the message
object you receive when consuming messages:
value
Buffer The raw message buffer from Kafka.size
Number The size of the message.key
String|Number Partioning key used.topic
String The topic this message comes from.offset
Number The Kafka offset.partition
Number The kafka partion used.parsed
Object The avro deserialized message as a JS Object ltieral.
The KafkaAvro instance also provides the following methods:
Logging
The Kafka Avro library logs messages using the Bunyan logger.
KafkaAvro.getLogger()
WARNING The logger will not emit any messages as it was expected, there is an open issue on Bunyan's repository pending a solution on this. So no logging for now.
NOTICE This is a static method on the KafkaAvro
constructor, not the instance. Therefore there is a single logger instance for the whole runtime.
Returns {Bunyan.Logger} Bunyan logger instance.
var KafkaAvro = require('kafka-avro');
var fmt = require('bunyan-format');
var kafkaLog = KafkaAvro.getLogger();
kafkaLog.addStream({
type: 'stream',
stream: fmt({
outputMode: 'short',
levelInString: true,
}),
level: 'info',
});
Read more about the bunyan-format package.
Helper Methods
kafkaAvro.serialize(type, schemaId, value)
Serialize the provided value with Avro, including the magic Byte and schema id provided.
Returns {Buffer} Serialized buffer message.
type
{avsc.Type} The avro type instance.schemaId
{number} The Schema Id in the SR.value
{*} Any value to serialize.
kafkaAvro.deserialize(type, message)
Deserialize the provided message, expects a message that includes Magic Byte and schema id.
Returns {*} Deserialized message.
type
{avsc.Type} The avro type instance.message
{Buffer} Message in byte code.
Testing
Use the kafka-avro-stub library to avoid requiring Kafka and Schema Registry to run on your local for testing your service.
Releasing
- Update the changelog bellow.
- Ensure you are on master.
- Type:
grunt release
grunt release:minor
for minor number jump.grunt release:major
for major number jump.
Release History
- v0.5.0, 15 Feb 2017
- Upgrade to node-rdkafka
0.7.0-ALPHA.3
which changes the consumer API by decoupling subscribing from consuming.
- v0.4.3, 15 Feb 2017
- Locked this version to
0.7.0-ALPHA.2
of node-rdkafka which broke BC in 0.7.0-ALPHA.3
.
- v0.4.2, 15 Feb 2017
- Fixed
connect()
invocation for consumers and producers.
- v0.4.1, 10 Feb 2017
- Fixed relaying Kafka consumer logs.
- v0.4.0, 03 Feb 2017
- Refactored all logging to use a central Bunyan logger that is now provided through the static method
KafkaAvro.getLogger()
. - Allowed for an Array of strings as topic argument for Consumer's
getReadStream()
method.
- v0.3.0, 01 Feb 2017
- Now force uses Magic Byte in any occasion when de/serializing.
- Exposed
serialize()
and deserialize()
methods. - Fixed de/serializing of topics not found in the Schema Registry.
- Tweaked log namespaces, still more work required to eventize them.
- v0.2.0, 30 Jan 2017
- Added Confluent's Magic Byte support when encoding and decoding messages.
- v0.1.2, 27 Jan 2017
- Suppress schema parsing errors.
- v0.1.1, 27 Jan 2017
- Fix signature of
getConsumer()
method.
- v0.1.1, 27 Jan 2017
- Expose CODES enum from node-rdkafka.
- Write more docs, add the event based consuming method.
- v0.1.0, 26 Jan 2017
- First fully working release.
- v0.0.1, 25 Jan 2017
License
Copyright Waldo, Inc. Licensed under the MIT.