Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

kafka-avro

Package Overview
Dependencies
Maintainers
1
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-avro

Node.js bindings for librdkafka with Avro schema serialization.

  • 0.1.1
  • Source
  • npm
  • Socket score

Version published
Weekly downloads
727
increased by20.36%
Maintainers
1
Weekly downloads
 
Created
Source

kafka-avro

Node.js bindings for librdkafka with Avro schema serialization.

The kafka-avro library is a wrapper that combines the node-rdkafka and avsc libraries to allow for Production and Consumption of messages on kafka validated and serialized by Avro.

Install

Install the module using NPM:

npm install kafka-avro --save

Documentation

The kafka-avro library operates in the following steps:

  1. You provide your Kafka Brokers and Schema Registry (SR) Url to a new instance of kafka-avro.
  2. You initialize kafka-avro, that will tell the library to query the SR for all registered schemas, evaluate and store them in runtime memory.
  3. kafka-avro will then expose the getConsumer() and getProducer() methods, which both return instsances of the corresponding Constructors from the node-rdkafka library.

The instances of "node-rdkafka" that are returned by kafka-avro are hacked so as to intercept produced and consumed messages and run them by the Avro de/serializer.

You are highly encouraged to read the "node-rdkafka" documentation, as it explains how you can use the Producer and Consumer instances as well as check out the available configuration options of node-rdkafka.

node-rdkafka CODES

The Kafka.CODES enumeration of constant values provided by the "node-rdkafka" library is also available as a static var at:

var KafkaAvro = require('kafka-avro');

console.log(KafkaAvro.CODES);

Initialize kafka-avro

var KafkaAvro = require('kafka-avro');

var kafkaAvro = new KafkaAvro({
    kafkaBroker: 'localhost:9092',
    schemaRegistry: 'localhost:8081',
});

// Query the Schema Registry for all topic-schema's
// fetch them and evaluate them.
kafkaAvro.init()
    .then(function() {
        console.log('Ready to use');
    });

Producer

NOTICE: You need to initialize kafka-avro before you can produce or consume messages.

By inoking the kafkaAvro.getProducer() method, kafka-avro will instantiate a Producer, make it connect and wait for it to be ready before the promise is resolved.

kafkaAvro.getProducer({
  // Options listed bellow
})
    // "getProducer()" returns a Bluebird Promise.
    .then(function(producer) {
        var topicName = 'test';

        producer.on('disconnected', function(arg) {
          console.log('producer disconnected. ' + JSON.stringify(arg));
        });

        //Create a Topic object with any options our Producer
        //should use when producing to that topic.
        var topic = producer.Topic(topicName, {
        // Make the Kafka broker acknowledge our message (optional)
        'request.required.acks': 1
        });

        var value = new Buffer('value-' +i);
        var key = 'key';

        // if partition is set to -1, librdkafka will use the default partitioner
        var partition = -1;
        producer.produce(topic, partition, value, key);
    })

What kafka-avro basically does is wrap around node-rdkafka and intercept the produce method to validate and serialize the message.

Consumer

NOTICE: You need to initialize kafka-avro before you can produce or consume messages.

By inoking the kafkaAvro.getConsumer() method, kafka-avro will instantiate a Consumer, make it connect and wait for it to be ready before the promise is resolved.

Consumer using events to consume
kafkaAvro.getConsumer({
  'group.id': 'librd-test',
  'socket.keepalive.enable': true,
  'enable.auto.commit': true,
})
  // the "getConsumer()" method will return a bluebird promise.
  .then(function(consumer) {
    var topicName = 'test';
    this.consumer.consume([topicName]);
    this.consumer.on('data', function(rawData) {
      console.log('data:', rawData);
    });
  });
Consumer using streams to consume
kafkaAvro.getConsumer({
  'group.id': 'librd-test',
  'socket.keepalive.enable': true,
  'enable.auto.commit': true,
})
    // the "getConsumer()" method will return a bluebird promise.
    .then(function(consumer) {
        var topicName = 'test';

        var stream = consumer.getReadStream(topicName, {
          waitInterval: 0
        });

        stream.on('error', function() {
          process.exit(1);
        });

        consumer.on('error', function(err) {
          console.log(err);
          process.exit(1);
        });

        stream.on('data', function(message) {
            console.log('Received message:', message);
        });
    });

Same deal here, thin wrapper around node-rdkafka and deserialize incoming messages before they reach your consuming method.

Consumer Data Object

kafka-avro intercepts all incoming messages and augments the object with one more property named parsed which contained the avro deserialized object. Here is a breakdown of the properties included in the message object you receive when consuming messages:

  • value Buffer The raw message buffer from Kafka.
  • size Number The size of the message.
  • key String|Number Partioning key used.
  • topic String The topic this message comes from.
  • offset Number The Kafka offset.
  • partition Number The kafka partion used.
  • parsed Object The avro deserialized message as a JS Object ltieral.

Releasing

  1. Update the changelog bellow.
  2. Ensure you are on master.
  3. Type: grunt release
    • grunt release:minor for minor number jump.
    • grunt release:major for major number jump.

Release History

  • v0.1.1, 27 Jan 2016
    • Expose CODES enum from node-rdkafka.
    • Write more docs, add the event based consuming method.
  • v0.1.0, 26 Jan 2016
    • First fully working release.
  • v0.0.1, 25 Jan 2016
    • Big Bang

License

Copyright Waldo, Inc. All rights reserved.

FAQs

Package last updated on 27 Jan 2017

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc