Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

kafka-avro

Package Overview
Dependencies
Maintainers
1
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-avro - npm Package Compare versions

Comparing version 0.0.1 to 0.1.0

test/spec/consumer.test.js

78

lib/kafka-consumer.js

@@ -24,3 +24,3 @@ /**

*/
Consumer.prototype.getConsumer = Promise.method(function (opts) {
Consumer.prototype.getConsumer = Promise.method(function (topicName, opts) {
if (!opts['metadata.broker.list']) {

@@ -30,4 +30,10 @@ opts['metadata.broker.list'] = this.kafkaBrokerUrl;

console.log('KafkaAvro :: Starting Consumer with opts:', opts);
var consumer = new kafka.KafkaConsumer(opts);
consumer.on('event.log', function(log) {
console.log('node-rdkafka log:', log);
});
// hack node-rdkafka

@@ -37,3 +43,12 @@ consumer.__kafkaAvro_getReadStream = consumer.getReadStream;

return consumer;
consumer.__kafkaAvro_on = consumer.on;
consumer.on = this._onWrapper.bind(this, consumer);
return new Promise(function(resolve) {
consumer.on('ready', function() {
resolve(consumer);
});
consumer.connect();
});
});

@@ -63,17 +78,48 @@

objectMode: true,
transform: function(data, encoding, callback) {
var value;
if (!this.valueSchemas[topic]) {
console.log('KafkaAvro :: Warning, consumer did not find topic on SR:',
topic);
value = JSON.parse(data.toString('utf-8'));
callback(null, value);
return;
}
value = this.valueSchemas[topic].fromBuffer(data.message);
// We want to force it to run async
callback(null, value);
}
transform: this._transformAvro.bind(this, topic),
}));
};
/**
* The node-rdkafka on method wrapper, will intercept "data" events and
* deserialize the incoming message using the existing schemas.
*
* @param {kafka.KafkaConsumer} consumerInstance node-rdkafka instance.
* @param {string} eventName the name to listen for events on.
* @param {Function} cb Event callback.
* @private
*/
Consumer.prototype._onWrapper = function (consumerInstance, eventName, cb) {
if (eventName !== 'data') {
return consumerInstance.__kafkaAvro_on(eventName, cb);
}
consumerInstance.__kafkaAvro_on('data', function(message) {
if (!this.valueSchemas[message.topic]) {
console.log('KafkaAvro :: Warning, consumer did not find topic on SR:',
message.topic);
message.parsed = JSON.parse(message.toString('utf-8'));
cb(message);
return;
}
message.parsed = this.valueSchemas[message.topic].fromBuffer(message.value);
cb(message);
}.bind(this));
};
Consumer.prototype._transformAvro = function (topicName, data, encoding, callback) {
if (!this.valueSchemas[topicName]) {
console.log('KafkaAvro :: Warning, consumer did not find topic on SR:',
topicName);
data.parsed = JSON.parse(data.toString('utf-8'));
callback(null, data);
return;
}
data.parsed = this.valueSchemas[topicName].fromBuffer(data.value);
callback(null, data);
};

@@ -50,3 +50,2 @@ /**

* @param {kafka.Producer} producerInstance node-rdkafka instance.
* @param {string} topic Topic to produce on.
* @param {kafka.Producer.Topic} kafkaTopic node-rdkafka Topic instance.

@@ -57,14 +56,17 @@ * @param {number} partition The partition to produce on.

*/
Producer.prototype._produceWrapper = function (producerInstance, topic, kafkaTopic,
Producer.prototype._produceWrapper = function (producerInstance, kafkaTopic,
partition, value, key) {
if (!this.valueSchemas[topic]) {
var topicName = kafkaTopic.name();
if (!this.valueSchemas[topicName]) {
// topic not found in schemas, bail early
console.log('KafkaAvro :: Warning, did not find topic on SR:', topic);
console.log('KafkaAvro :: Warning, did not find topic on SR:', topicName);
var bufVal = new Buffer(JSON.stringify(value));
return producerInstance.__kafkaAvro_produce(topic, partition, bufVal, key);
return producerInstance.__kafkaAvro_produce(kafkaTopic, partition, bufVal, key);
}
var bufValue = this.valueSchemas[topic].toBuffer(value);
return producerInstance.__kafkaAvro_produce(topic, partition, bufValue, key);
var bufValue = this.valueSchemas[topicName].toBuffer(value);
return producerInstance.__kafkaAvro_produce(kafkaTopic, partition, bufValue, key);
};
{
"name": "kafka-avro",
"version": "0.0.1",
"version": "0.1.0",
"main": "./lib/kafka-avro",

@@ -5,0 +5,0 @@ "description": "Node.js bindings for librdkafka with Avro schema serialization.",

@@ -5,3 +5,3 @@ # kafka-avro

**WARNING** Still WIP, consumer not working!
The kafka-avro library is a wrapper that combines the [node-rdkafka][node-rdkafka] and [avsc](avsc) libraries to allow for Production and Consumption of messages on kafka validated and serialized by Avro.

@@ -18,5 +18,14 @@ ## Install

The kafka-avro library operates in the following steps:
1. You provide your Kafka Brokers and Schema Registry (SR) Url to a new instance of kafka-avro.
1. You initialize kafka-avro, that will tell the library to query the SR for all registered schemas, evaluate and store them in runtime memory.
1. kafka-avro will then expose the `getConsumer()` and `getProducer()` methods, which both return instsances of the corresponding Constructors from the [node-rdkafka][node-rdkafka] library.
The instances of "node-rdkafka" that are returned by kafka-avro are hacked so as to intercept produced and consumed messages and run them by the Avro de/serializer.
You are highly encouraged to read the ["node-rdkafka" documentation](https://blizzard.github.io/node-rdkafka/current/) on the way you can use the Producer and Consumer instances as well as check out the [available configuration options of node-rdkafka](https://github.com/edenhill/librdkafka/blob/2213fb29f98a7a73f22da21ef85e0783f6fd67c4/CONFIGURATION.md)
### Initialize kafka-avro
Initialize kafka-avro
```js

@@ -67,3 +76,3 @@ var KafkaAvro = require('kafka-avro');

var partition = -1;
producer.produce(topicName, topic, partition, value, key);
producer.produce(topic, partition, value, key);
});

@@ -86,2 +95,4 @@

> NOTICE: You need to initialize kafka-avro before you can produce or consume messages.
```js

@@ -106,15 +117,10 @@ var Transform = require('stream').Transform;

stream
.pipe(new Transform({
objectMode: true,
transform: function(data, encoding, callback) {
// do your async stuff, then:
callback(null, data.message);
}
}))
.pipe(process.stdout);
consumer.on('error', function(err) {
console.log(err);
process.exit(1);
});
stream.on('data', function(message) {
console.log('Received message:', message);
});
```

@@ -127,2 +133,14 @@

#### Consumer Data Object
kafka-avro intercepts all incoming messages and augments the object with one more property named `parsed` which contained the avro deserialized object. Here is a breakdown of the properties included in the `message` object you receive when consuming messages:
* `value` **Buffer** The raw message buffer from Kafka.
* `size` **Number** The size of the message.
* `key` **String|Number** Partioning key used.
* `topic` **String** The topic this message comes from.
* `offset` **Number** The Kafka offset.
* `partition` **Number** The kafka partion used.
* `parsed` **Object** The avro deserialized message as a JS Object ltieral.
## Releasing

@@ -138,2 +156,4 @@

- **v0.1.0**, *26 Jan 2016*
- First fully working release.
- **v0.0.1**, *25 Jan 2016*

@@ -145,1 +165,4 @@ - Big Bang

Copyright Waldo, Inc. All rights reserved.
[avsc]: https://github.com/mtth/avsc
[node-rdkafka]: https://github.com/Blizzard/node-rdkafka

@@ -5,2 +5,3 @@ /*

var axios = require('axios');
var Promise = require('bluebird');

@@ -11,10 +12,8 @@ var schemaFix = require('../fixtures/schema.fix');

testLib.KAFKA_SCHEMA_REGISTRY_URL = 'http://schema-registry-confluent.internal.dev.waldo.photos';
testLib.KAFKA_BROKER_URL = 'broker-1.service.consul:9092,broker-3.service.consul:9092,broker-2.service.consul:9092';
testLib.KAFKA_SCHEMA_REGISTRY_URL = 'http://localhost:8081';
testLib.KAFKA_BROKER_URL = 'localhost:9092';
testLib.topic = schemaFix.name;
testLib.topicTwo = schemaFix.name + 'Two';
// curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
// --data '{"schema": "{\"name\": \"string\", \"long\": \"long\"}"}' \
// http://schema-registry-confluent.internal.dev.waldo.photos/subjects/test-thanpolas-Kafka/versions
var testBoot = false;

@@ -33,30 +32,37 @@

beforeEach(function() {
var schemaCreateUrl = testLib.KAFKA_SCHEMA_REGISTRY_URL +
'/subjects/' + testLib.topic + '-value/versions';
var data = {
schema: JSON.stringify(schemaFix),
};
console.log('DATA:', data);
return axios({
url: schemaCreateUrl,
method: 'post',
headers: {
'Content-Type': 'application/vnd.schemaregistry.v1+json',
},
data: data,
})
.catch(function(err) {
console.error('Axios SR creation failed:', err);
throw err;
});
return Promise.all([
testLib.registerSchema(testLib.topic, schemaFix),
testLib.registerSchema(testLib.topicTwo, schemaFix),
]);
});
// # Register a new version of a schema under the subject "Kafka-value"
// $ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
// --data '{"schema": "{\"type\": \"string\"}"}' \
// http://localhost:8081/subjects/Kafka-value/versions
// {"id":1}
};
/**
* Register a schema on SR.
*
* @param {string} The topic.
* @param {schema} Object The schema to register.
* @return {Promise} A Promise.
*/
testLib.registerSchema = Promise.method(function(topic, schema) {
var schemaCreateUrl = testLib.KAFKA_SCHEMA_REGISTRY_URL +
'/subjects/' + topic + '-value/versions';
var data = {
schema: JSON.stringify(schema),
};
return axios({
url: schemaCreateUrl,
method: 'post',
headers: {
'Content-Type': 'application/vnd.schemaregistry.v1+json',
},
data: data,
})
.catch(function(err) {
console.error('Axios SR creation failed:', err);
throw err;
});
});
/** @type {Object} simple logger */

@@ -63,0 +69,0 @@ testLib.log = {

@@ -10,2 +10,3 @@ /**

describe('Produce', function() {

@@ -24,17 +25,6 @@ testLib.init();

beforeEach(function() {
return this.kafkaAvro.getConsumer({
'group.id': 'kafka-avro-test',
'socket.keepalive.enable': true,
'enable.auto.commit': true,
})
.then((consumer) => {
this.consumer = consumer;
});
});
beforeEach(function() {
return this.kafkaAvro.getProducer({
'dr_cb': true,
})
.then((producer) => {
.then(function (producer) {
this.producer = producer;

@@ -51,6 +41,5 @@

producer.on('delivery-report', function(report) {
console.log('delivery-report:' + JSON.stringify(report));
producer.on('delivery-report', function() {
this.gotReceipt = true;
});
}.bind(this));

@@ -61,7 +50,11 @@ this.producerTopic = producer.Topic(testLib.topic, {

});
});
}.bind(this));
});
afterEach(function(done) {
this.producer.disconnect(function(err) {
done(err);
});
});
it.only('should produce and consume a message', function(done) {
console.log('test start');
it('should produce a message', function(done) {
var message = {

@@ -71,36 +64,4 @@ name: 'Thanasis',

};
this.producer.produce(this.producerTopic, -1, message, 'key');
var stream = this.consumer.getReadStream(testLib.topic, {
waitInterval: 0
});
console.log('stream:', typeof stream.pipe);
stream.on('error', function(err) {
console.error('FATAL Stream error:', err);
done(err);
});
this.consumer.on('error', function(err) {
console.error('Consumer Error:', err);
});
// stream
// .pipe((data) => {
// console.log('GOT:', data);
// expect(data).to.deep.equal(message);
// this.consumer.disconnect();
// done();
// });
stream.on('data', (data) => {
console.log('GOT:', data);
expect(data).to.deep.equal(message);
this.consumer.disconnect();
done();
});
console.log('producing...');
// Produce message
this.producer.produce(testLib.topic, this.producerTopic, -1, message, 'key');
//need to keep polling for a while to ensure the delivery reports are received

@@ -111,7 +72,26 @@ var pollLoop = setInterval(() => {

clearInterval(pollLoop);
this.producer.disconnect();
done();
}
}, 1000);
});
it('should not allow invalid type', function() {
var message = {
name: 'Thanasis',
long: '540',
};
var binded = this.producer.produce.bind(this.producer, this.producerTopic, -1, message, 'key');
expect(binded).to.throw(Error);
});
it('should not allow less attributes', function() {
var message = {
name: 'Thanasis',
};
var binded = this.producer.produce.bind(this.producer, this.producerTopic, -1, message, 'key');
expect(binded).to.throw(Error);
});
});

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc