Research
Security News
Malicious npm Packages Inject SSH Backdoors via Typosquatted Libraries
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
kafka-avro
Advanced tools
Node.js bindings for librdkafka with Avro schema serialization.
The kafka-avro library is a wrapper that combines the node-rdkafka and avsc libraries to allow for Production and Consumption of messages on kafka validated and serialized by Avro.
Install the module using NPM:
npm install kafka-avro --save
The kafka-avro library operates in the following steps:
getConsumer()
and getProducer()
methods, which both return instances of the corresponding Constructors from the node-rdkafka library.The instances of "node-rdkafka" that are returned by kafka-avro are hacked so as to intercept produced and consumed messages and run them by the Avro de/serializer along with Confluent's Schema Registry Magic Byte and Schema Id.
You are highly encouraged to read the "node-rdkafka" documentation, as it explains how you can use the Producer and Consumer instances as well as check out the available configuration options of node-rdkafka.
The Kafka.CODES
enumeration of constant values provided by the "node-rdkafka" library is also available as a static var at:
var KafkaAvro = require('kafka-avro');
console.log(KafkaAvro.CODES);
var KafkaAvro = require('kafka-avro');
var kafkaAvro = new KafkaAvro({
kafkaBroker: 'localhost:9092',
schemaRegistry: 'http://localhost:8081',
});
// Query the Schema Registry for all topic-schema's
// fetch them and evaluate them.
kafkaAvro.init()
.then(function() {
console.log('Ready to use');
});
When instantiating kafka-avro you may pass the following options:
kafkaBroker
String REQUIRED The url or comma delimited strings pointing to your kafka brokers.schemaRegistry
String REQUIRED The url to the Schema Registry.topics
Array of Strings You may optionally define specific topics to be fetched by kafka-avro vs fetching schemas for all the topics which is the default behavior.fetchAllVersions
Boolean Set to true to fetch all versions for each topic, use it when updating of schemas is often in your environment.fetchRefreshRate
Number The pooling time (in seconds) to the schemas be fetched and updated in background. This is useful to keep with schemas changes in production. The default value is 0
seconds (disabled).parseOptions
Object Schema parse options to pass to avro.parse()
. parseOptions.wrapUnions
is set to true
by default.httpsAgent
Object initialized https Agent classshouldFailWhenSchemaIsMissing
Boolean Set to true if producing a message for which no AVRO schema can be found should throw an errorNOTICE: You need to initialize kafka-avro before you can produce or consume messages.
By invoking the kafkaAvro.getProducer()
method, kafka-avro will instantiate a Producer, make it connect and wait for it to be ready before the promise is resolved.
kafkaAvro.getProducer({
// Options listed bellow
})
// "getProducer()" returns a Bluebird Promise.
.then(function(producer) {
var topicName = 'test';
producer.on('disconnected', function(arg) {
console.log('producer disconnected. ' + JSON.stringify(arg));
});
var value = {name:'John'};
var key = 'key';
// if partition is set to -1, librdkafka will use the default partitioner
var partition = -1;
producer.produce(topicName, partition, value, key);
})
What kafka-avro basically does is wrap around node-rdkafka and intercept the produce method to validate and serialize the message.
NOTICE: You need to initialize kafka-avro before you can produce or consume messages.
By invoking the kafkaAvro.getConsumer()
method, kafka-avro will instantiate a Consumer, listen on log, error and disconnect events and return it to you. Depending on the consuming pattern you follow you may or may not need to perform a connect()
.
When consuming topics using the data
event you will need to perform a connect()
as per node-rdkafka documentation:
kafkaAvro.getConsumer({
'group.id': 'librd-test',
'socket.keepalive.enable': true,
'enable.auto.commit': true,
})
// the "getConsumer()" method will return a bluebird promise.
.then(function(consumer) {
// Perform a consumer.connect()
return new Promise(function (resolve, reject) {
consumer.on('ready', function() {
resolve(consumer);
});
consumer.connect({}, function(err) {
if (err) {
reject(err);
return;
}
resolve(consumer); // depend on Promises' single resolve contract.
});
});
})
.then(function(consumer) {
// Subscribe and consume.
var topicName = 'test';
consumer.subscribe([topicName]);
consumer.consume();
consumer.on('data', function(rawData) {
console.log('data:', rawData);
});
});
kafkaAvro.getConsumerStream({
'group.id': 'librd-test',
'socket.keepalive.enable': true,
'enable.auto.commit': true,
},
{
'request.required.acks': 1
},
{
'topics': 'test'
})
.then(function(stream) {
stream.on('error', function(err) {
if (err) console.log(err);
process.exit(1);
});
stream.on('data', function (rawData) {
console.log('data:', rawData)
});
stream.on('error', function(err) {
console.log(err);
process.exit(1);
});
stream.consumer.on('event.error', function(err) {
console.log(err);
})
});
Same deal here, thin wrapper around node-rdkafka and deserialize incoming messages before they reach your consuming method.
kafka-avro intercepts all incoming messages and augments the object with two more properties named parsed
and parsedKey
, which contained the avro deserialized object's value and key. Here is a breakdown of the properties included in the message
object you receive when consuming messages:
value
Buffer The raw message buffer from Kafka.size
Number The size of the message.key
String|Number Partioning key used.topic
String The topic this message comes from.offset
Number The Kafka offset.partition
Number The kafka partion used.parsed
Object The avro deserialized value as a JS Object ltieral.schemaId
Number The Registry Value Schema id of the consumed message.parsedKey
Object The avro deserialized key as a JS Object ltieral.schemaIdKey
Number The Registry Key Schema id of the consumed message.The KafkaAvro instance also provides the following methods:
The Kafka Avro library logs messages using the Bunyan logger. To enable logging you will have to define at least one of the needed ENV variables:
KAFKA_AVRO_LOG_LEVEL
Set it a valid Bunyan log level value to activate console logging (Typically you'd need either info
or debug
as values.)KAFKA_AVRO_LOG_NO_COLORS
Set this to any value to disable color when logging.WARNING The logger will not emit any messages as it was expected, there is an open issue on Bunyan's repository pending a solution on this. So no logging for now.
NOTICE This is a static method on the
KafkaAvro
constructor, not the instance. Therefore there is a single logger instance for the whole runtime.
Returns {Bunyan.Logger} Bunyan logger instance.
var KafkaAvro = require('kafka-avro');
var fmt = require('bunyan-format');
var kafkaLog = KafkaAvro.getLogger();
kafkaLog.addStream({
type: 'stream',
stream: fmt({
outputMode: 'short',
levelInString: true,
}),
level: 'info',
});
Read more about the bunyan-format package.
Serialize the provided value with Avro, including the magic Byte and schema id provided.
Returns {Buffer} Serialized buffer message.
type
{avsc.Type} The avro type instance.schemaId
{number} The Schema Id in the SR.value
{*} Any value to serialize.Deserialize the provided message, expects a message that includes Magic Byte and schema id.
Returns {*} Deserialized message.
type
{avsc.Type} The avro type instance.message
{Buffer} Message in byte code.You can use docker-compose up
to up all the stack before you call your integration tests with npm test
. How the integration tests are outside the containers, you will need set you hosts
file to :
127.0.0.1 kafka
grunt release
grunt release:minor
for minor number jump.grunt release:major
for major number jump.librdkafka
v1.1.0shouldFailWhenSchemaIsMissing
to let the producer fail when no schema could be found (instead of producing as JSON) (by bfncs)fetchAllVersions
feature (by ricardohbin)schemaMeta
for key schemas also (by eparreno)fetchRefreshRate
parameter, to set a way to update the schemas after the app initialization. (by ricardohbin)schemaId
property on the consumed messages.connect()
method manually, check the docs.connect()
callbacks for both Consumer and Producer.0.7.0-ALPHA.3
which changes the consumer API by decoupling subscribing from consuming.0.7.0-ALPHA.2
of node-rdkafka which broke BC in 0.7.0-ALPHA.3
.connect()
invocation for consumers and producers.KafkaAvro.getLogger()
.getReadStream()
method.serialize()
and deserialize()
methods.getConsumer()
method.Copyright Waldo, Inc. Licensed under the MIT.
FAQs
Node.js bindings for librdkafka with Avro schema serialization.
The npm package kafka-avro receives a total of 519 weekly downloads. As such, kafka-avro popularity was classified as not popular.
We found that kafka-avro demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.
Security News
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
Security News
In this segment of the Risky Business podcast, Feross Aboukhadijeh and Patrick Gray discuss the challenges of tracking malware discovered in open source softare.