New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafka-avro

Package Overview
Dependencies
Maintainers
1
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-avro - npm Package Compare versions

Comparing version 0.1.3 to 0.2.0

lib/magic-byte.js

53

lib/kafka-avro.js

@@ -26,2 +26,4 @@ /*

function noop() {}
/**

@@ -34,2 +36,3 @@ * @fileOverview bootstrap and master exporing module.

*
* @param {Object} opts The options.
* @constructor

@@ -44,2 +47,10 @@ */

/** @type {boolean} Indicates if we se/deserialize with MAGIC BYTE */
this.hasMagicByte = !!opts.hasMagicByte;
/** @type {Array.<node-rdkafka.Producer>} Instanciated producers. */
this._producers = [];
/** @type {Array.<node-rdkafka.Consumer>} Instanciated consumers. */
this._consumers = [];
/**

@@ -60,2 +71,15 @@ * A dict containing all the value schemas with key the bare topic name and

this.keySchemas = {};
/**
* A dict containing all the value schemas metadata, with key the bare
* topic name and value the SR response on that topic:
*
* 'subject' {string} The full topic name, including the '-value' suffix.
* 'version' {number} The version number of the schema.
* 'id' {number} The schema id.
* 'schema' {string} JSON serialized schema.
*
* @type {Object}
*/
this.schemaMeta = {};
});

@@ -89,2 +113,27 @@

/**
* Dispose the method.
*
* @return {Promise} A Promise.
*/
KafkaAvro.prototype.dispose = Promise.method(function () {
var disconnectPromises = [];
this._consumers.forEach(function(consumer) {
var discon = Promise.promisify(consumer.disconnect.bind(consumer));
var disconProm = discon().catch(noop);
disconnectPromises.push(disconProm);
});
this._producers.forEach(function(producer) {
var discon = Promise.promisify(producer.disconnect.bind(producer));
var disconProm = discon().catch(noop);
disconnectPromises.push(disconProm);
});
this.hasMagicByte = false;
return Promise.all(disconnectPromises);
});
/**
* Fetch all registered schema topics from SR.

@@ -125,2 +174,3 @@ *

return {
responseRaw: response.data,
schemaType: schemaType,

@@ -149,6 +199,9 @@ schemaTopicRaw: schemaTopic,

ex.message);
return schemaObj;
}
if (schemaObj.schemaType.toLowerCase() === 'value') {
this.valueSchemas[schemaObj.topic] = schemaObj.type;
this.schemaMeta[schemaObj.topic] = schemaObj.responseRaw;
} else {

@@ -155,0 +208,0 @@ this.keySchemas[schemaObj.topic] = schemaObj.type;

35

lib/kafka-consumer.js

@@ -10,2 +10,4 @@ /**

var magicByte = require('./magic-byte');
/**

@@ -34,2 +36,4 @@ * Wrapper for node-rdkafka Consumer Ctor, a mixin.

this._consumers.push(consumer);
consumer.on('event.log', function(log) {

@@ -74,2 +78,6 @@ console.log('node-rdkafka log:', log);

stream.on('error', function(err) {
console.error('KafkaAvro.Consumer :: Read Stream Error:', err);
});
return stream

@@ -106,3 +114,8 @@ .pipe(new Transform({

message.parsed = this.valueSchemas[message.topic].fromBuffer(message.value);
var type = this.valueSchemas[message.topic];
if (this.hasMagicByte) {
message.parsed = magicByte.fromMessageBuffer(type, message.value).value;
} else {
message.parsed = type.fromBuffer(message.value);
}

@@ -113,3 +126,11 @@ cb(message);

/**
* Callback for Stream Transform, will deserialize the message properly.
*
* @param {string} topicName The topic name.
* @param {*} data The message to encode.
* @param {string=} encoding Encoding of the message.
* @param {Function} callback Callback to call when done.
* @private
*/
Consumer.prototype._transformAvro = function (topicName, data, encoding, callback) {

@@ -124,4 +145,12 @@ if (!this.valueSchemas[topicName]) {

}
data.parsed = this.valueSchemas[topicName].fromBuffer(data.value);
var type = this.valueSchemas[topicName];
console.log('consuming STREAM topic:', data.topic, 'has byte:', this.hasMagicByte);
if (this.hasMagicByte) {
data.parsed = magicByte.fromMessageBuffer(type, data.value).value;
} else {
data.parsed = type.fromBuffer(data.value);
}
callback(null, data);
};

@@ -8,2 +8,4 @@ /**

var magicByte = require('./magic-byte');
/**

@@ -34,2 +36,4 @@ * Wrapper for node-rdkafka Produce Ctor, a mixin.

this._producers.push(producer);
// hack node-rdkafka

@@ -56,5 +60,6 @@ producer.__kafkaAvro_produce = producer.produce;

* @param {string|number} key The partioning key.
* @param {*=} optOpaque Pass vars to receipt handler.
*/
Producer.prototype._produceWrapper = function (producerInstance, kafkaTopic,
partition, value, key) {
partition, value, key, optOpaque) {

@@ -67,8 +72,19 @@ var topicName = kafkaTopic.name();

var bufVal = new Buffer(JSON.stringify(value));
return producerInstance.__kafkaAvro_produce(kafkaTopic, partition, bufVal, key);
return producerInstance.__kafkaAvro_produce(kafkaTopic, partition, bufVal,
key, optOpaque);
}
var bufValue = this.valueSchemas[topicName].toBuffer(value);
var type = this.valueSchemas[topicName];
var schemaId = this.schemaMeta[topicName].id;
return producerInstance.__kafkaAvro_produce(kafkaTopic, partition, bufValue, key);
var bufValue;
if (this.hasMagicByte) {
bufValue = magicByte.toMessageBuffer(value, type, schemaId);
} else {
bufValue = type.toBuffer(value);
}
return producerInstance.__kafkaAvro_produce(kafkaTopic, partition, bufValue,
key, optOpaque);
};

4

package.json
{
"name": "kafka-avro",
"version": "0.1.3",
"version": "0.2.0",
"main": "./lib/kafka-avro",

@@ -24,3 +24,3 @@ "description": "Node.js bindings for librdkafka with Avro schema serialization.",

"scripts": {
"test": "mocha -b test/spec && eslint lib/**"
"test": "eslint lib && mocha -b test/spec && eslint lib/**"
},

@@ -27,0 +27,0 @@ "dependencies": {

@@ -5,2 +5,4 @@ # kafka-avro

[![CircleCI](https://circleci.com/gh/waldophotos/kafka-avro/tree/master.svg?style=svg)](https://circleci.com/gh/waldophotos/kafka-avro/tree/master)
The kafka-avro library is a wrapper that combines the [node-rdkafka][node-rdkafka] and [avsc](avsc) libraries to allow for Production and Consumption of messages on kafka validated and serialized by Avro.

@@ -46,2 +48,3 @@

schemaRegistry: 'localhost:8081',
hasMagicByte: 'true',
});

@@ -57,2 +60,10 @@

### Kafka-avro options
When instantiating kafka-avro you may pass the following options:
* `kafkaBroker` **String REQUIRED** The url or comma delimited strings pointing to your kafka brokers.
* `schemaRegistry` **String REQUIRED** The url to the Schema Registry.
* `hasMagicByte` **Boolean** *Default*: `false` Enable this for Confluence Schema Registry Magic Byte insertion.
### Producer

@@ -62,3 +73,3 @@

By inoking the `kafkaAvro.getProducer()` method, kafka-avro will instantiate a Producer, make it connect and wait for it to be ready before the promise is resolved.
By invoking the `kafkaAvro.getProducer()` method, kafka-avro will instantiate a Producer, make it connect and wait for it to be ready before the promise is resolved.

@@ -181,2 +192,4 @@ ```js

- **v0.2.0**, *30 Jan 2016*
- Added Confluent's Magic Byte support when encoding and decoding messages.
- **v0.1.2**, *27 Jan 2016*

@@ -183,0 +196,0 @@ - Suppress schema parsing errors.

@@ -24,8 +24,8 @@ /*

testLib.init = function() {
if (testBoot) {
return;
}
testBoot = true;
beforeEach(function() {
if (testBoot) {
return;
}
testBoot = true;
beforeEach(function() {
return Promise.all([

@@ -52,2 +52,5 @@ testLib.registerSchema(testLib.topic, schemaFix),

};
console.log('TEST :: Registering schema:', topic, 'on SR:', schemaCreateUrl);
return axios({

@@ -54,0 +57,0 @@ url: schemaCreateUrl,

/**
* @fileOverview Test produce and consume messages using kafka-avro.
*/
var crypto = require('crypto');

@@ -11,2 +12,3 @@ var chai = require('chai');

function noop () {}

@@ -27,4 +29,5 @@ describe('Consume', function() {

this.consOpts = {
'group.id': 'testKafkaAvro',
'group.id': 'testKafkaAvro' + crypto.randomBytes(20).toString('hex'),
'enable.auto.commit': true,
// 'auto.offset.reset': 'earliest',
// 'session.timeout.ms': 1000,

@@ -36,2 +39,5 @@ };

this.consumer = consumer;
this.consumer.on('error', function(err) {
console.log('consumerError:', err);
});
});

@@ -74,9 +80,7 @@ });

afterEach(function() {
return this.kafkaAvro.dispose();
});
describe('Consumer direct "on"', function() {
afterEach(function(done) {
this.consumer.disconnect(function() {
done();
});
});
it('should produce and consume a message using consume "on"', function(done) {

@@ -110,12 +114,12 @@ var produceTime = 0;

this.producer.produce(this.producerTopic, -1, message, 'key');
}, 2000);
}, 4000);
//need to keep polling for a while to ensure the delivery reports are received
var pollLoop = setInterval(function () {
this.producer.poll();
if (this.gotReceipt) {
clearInterval(pollLoop);
this.producer.disconnect();
}
}.bind(this), 1000);
// //need to keep polling for a while to ensure the delivery reports are received
// var pollLoop = setInterval(function () {
// this.producer.poll();
// if (this.gotReceipt) {
// clearInterval(pollLoop);
// this.producer.disconnect();
// }
// }.bind(this), 1000);
});

@@ -141,3 +145,2 @@

this.consumer.on('data', function(rawData) {
console.log('GOT:', rawData);
if (rawData.topic === testLib.topic) {

@@ -169,13 +172,3 @@ receivedOne = true;

}, 2000);
//need to keep polling for a while to ensure the delivery reports are received
var pollLoop = setInterval(function () {
this.producer.poll();
if (this.gotReceipt) {
clearInterval(pollLoop);
this.producer.disconnect();
}
}.bind(this), 1000);
});
});

@@ -195,2 +188,3 @@

});
stream.on('error', noop);

@@ -212,3 +206,2 @@ stream.on('data', function(dataRaw) {

setTimeout(() => {

@@ -218,12 +211,4 @@ produceTime = Date.now();

}, 2000);
//need to keep polling for a while to ensure the delivery reports are received
var pollLoop = setInterval(function () {
this.producer.poll();
if (this.gotReceipt) {
clearInterval(pollLoop);
this.producer.disconnect();
}
}.bind(this), 1000);
});
});
});

@@ -10,3 +10,2 @@ /**

describe('Produce', function() {

@@ -78,3 +77,4 @@ testLib.init();

var binded = this.producer.produce.bind(this.producer, this.producerTopic, -1, message, 'key');
var binded = this.producer.produce.bind(this.producer, this.producerTopic,
-1, message, 'key');

@@ -81,0 +81,0 @@ expect(binded).to.throw(Error);

@@ -9,5 +9,6 @@ /**

var srUrl = 'http://schema-registry-confluent.internal.dev.waldo.photos';
// var srUrl = 'http://schema-registry-confluent.internal.dev.waldo.photos';
var srUrl = 'http://localhost:8081';
describe('Initialization of RS', function() {
describe('Initialization of SR', function() {
it('should initialize properly', function() {

@@ -21,2 +22,3 @@ var kafkaAvro = new KafkaAvro({

expect(res).to.have.keys([
'responseRaw',
'schemaType',

@@ -28,2 +30,9 @@ 'topic',

]);
expect(res.responseRaw).to.have.keys([
'subject',
'version',
'id',
'schema',
]);
})

@@ -34,2 +43,29 @@ .then((all) => {

});
it('kafkaAvro instance should contain expected values after init', function() {
var kafkaAvro = new KafkaAvro({
schemaRegistry: srUrl,
});
return kafkaAvro.init()
.map((res) => {
if (res.schemaType.toLowerCase() === 'value') {
expect(kafkaAvro.valueSchemas[res.topic]).to.be.an('object');
expect(kafkaAvro.schemaMeta[res.topic]).to.be.an('object');
expect(kafkaAvro.schemaMeta[res.topic]).to.have.keys([
'subject',
'version',
'id',
'schema',
]);
} else {
expect(kafkaAvro.keySchemas[res.topic]).to.be.an('object');
}
})
.then((all) => {
expect(all).to.have.length.of.at.least(1);
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc