kafka-avro
Advanced tools
Comparing version 0.0.1 to 0.1.0
@@ -24,3 +24,3 @@ /** | ||
*/ | ||
Consumer.prototype.getConsumer = Promise.method(function (opts) { | ||
Consumer.prototype.getConsumer = Promise.method(function (topicName, opts) { | ||
if (!opts['metadata.broker.list']) { | ||
@@ -30,4 +30,10 @@ opts['metadata.broker.list'] = this.kafkaBrokerUrl; | ||
console.log('KafkaAvro :: Starting Consumer with opts:', opts); | ||
var consumer = new kafka.KafkaConsumer(opts); | ||
consumer.on('event.log', function(log) { | ||
console.log('node-rdkafka log:', log); | ||
}); | ||
// hack node-rdkafka | ||
@@ -37,3 +43,12 @@ consumer.__kafkaAvro_getReadStream = consumer.getReadStream; | ||
return consumer; | ||
consumer.__kafkaAvro_on = consumer.on; | ||
consumer.on = this._onWrapper.bind(this, consumer); | ||
return new Promise(function(resolve) { | ||
consumer.on('ready', function() { | ||
resolve(consumer); | ||
}); | ||
consumer.connect(); | ||
}); | ||
}); | ||
@@ -63,17 +78,48 @@ | ||
objectMode: true, | ||
transform: function(data, encoding, callback) { | ||
var value; | ||
if (!this.valueSchemas[topic]) { | ||
console.log('KafkaAvro :: Warning, consumer did not find topic on SR:', | ||
topic); | ||
value = JSON.parse(data.toString('utf-8')); | ||
callback(null, value); | ||
return; | ||
} | ||
value = this.valueSchemas[topic].fromBuffer(data.message); | ||
// We want to force it to run async | ||
callback(null, value); | ||
} | ||
transform: this._transformAvro.bind(this, topic), | ||
})); | ||
}; | ||
/** | ||
* The node-rdkafka on method wrapper, will intercept "data" events and | ||
* deserialize the incoming message using the existing schemas. | ||
* | ||
* @param {kafka.KafkaConsumer} consumerInstance node-rdkafka instance. | ||
* @param {string} eventName the name to listen for events on. | ||
* @param {Function} cb Event callback. | ||
* @private | ||
*/ | ||
Consumer.prototype._onWrapper = function (consumerInstance, eventName, cb) { | ||
if (eventName !== 'data') { | ||
return consumerInstance.__kafkaAvro_on(eventName, cb); | ||
} | ||
consumerInstance.__kafkaAvro_on('data', function(message) { | ||
if (!this.valueSchemas[message.topic]) { | ||
console.log('KafkaAvro :: Warning, consumer did not find topic on SR:', | ||
message.topic); | ||
message.parsed = JSON.parse(message.toString('utf-8')); | ||
cb(message); | ||
return; | ||
} | ||
message.parsed = this.valueSchemas[message.topic].fromBuffer(message.value); | ||
cb(message); | ||
}.bind(this)); | ||
}; | ||
Consumer.prototype._transformAvro = function (topicName, data, encoding, callback) { | ||
if (!this.valueSchemas[topicName]) { | ||
console.log('KafkaAvro :: Warning, consumer did not find topic on SR:', | ||
topicName); | ||
data.parsed = JSON.parse(data.toString('utf-8')); | ||
callback(null, data); | ||
return; | ||
} | ||
data.parsed = this.valueSchemas[topicName].fromBuffer(data.value); | ||
callback(null, data); | ||
}; |
@@ -50,3 +50,2 @@ /** | ||
* @param {kafka.Producer} producerInstance node-rdkafka instance. | ||
* @param {string} topic Topic to produce on. | ||
* @param {kafka.Producer.Topic} kafkaTopic node-rdkafka Topic instance. | ||
@@ -57,14 +56,17 @@ * @param {number} partition The partition to produce on. | ||
*/ | ||
Producer.prototype._produceWrapper = function (producerInstance, topic, kafkaTopic, | ||
Producer.prototype._produceWrapper = function (producerInstance, kafkaTopic, | ||
partition, value, key) { | ||
if (!this.valueSchemas[topic]) { | ||
var topicName = kafkaTopic.name(); | ||
if (!this.valueSchemas[topicName]) { | ||
// topic not found in schemas, bail early | ||
console.log('KafkaAvro :: Warning, did not find topic on SR:', topic); | ||
console.log('KafkaAvro :: Warning, did not find topic on SR:', topicName); | ||
var bufVal = new Buffer(JSON.stringify(value)); | ||
return producerInstance.__kafkaAvro_produce(topic, partition, bufVal, key); | ||
return producerInstance.__kafkaAvro_produce(kafkaTopic, partition, bufVal, key); | ||
} | ||
var bufValue = this.valueSchemas[topic].toBuffer(value); | ||
return producerInstance.__kafkaAvro_produce(topic, partition, bufValue, key); | ||
var bufValue = this.valueSchemas[topicName].toBuffer(value); | ||
return producerInstance.__kafkaAvro_produce(kafkaTopic, partition, bufValue, key); | ||
}; |
{ | ||
"name": "kafka-avro", | ||
"version": "0.0.1", | ||
"version": "0.1.0", | ||
"main": "./lib/kafka-avro", | ||
@@ -5,0 +5,0 @@ "description": "Node.js bindings for librdkafka with Avro schema serialization.", |
@@ -5,3 +5,3 @@ # kafka-avro | ||
**WARNING** Still WIP, consumer not working! | ||
The kafka-avro library is a wrapper that combines the [node-rdkafka][node-rdkafka] and [avsc](avsc) libraries to allow for Production and Consumption of messages on kafka validated and serialized by Avro. | ||
@@ -18,5 +18,14 @@ ## Install | ||
The kafka-avro library operates in the following steps: | ||
1. You provide your Kafka Brokers and Schema Registry (SR) Url to a new instance of kafka-avro. | ||
1. You initialize kafka-avro, that will tell the library to query the SR for all registered schemas, evaluate and store them in runtime memory. | ||
1. kafka-avro will then expose the `getConsumer()` and `getProducer()` methods, which both return instsances of the corresponding Constructors from the [node-rdkafka][node-rdkafka] library. | ||
The instances of "node-rdkafka" that are returned by kafka-avro are hacked so as to intercept produced and consumed messages and run them by the Avro de/serializer. | ||
You are highly encouraged to read the ["node-rdkafka" documentation](https://blizzard.github.io/node-rdkafka/current/) on the way you can use the Producer and Consumer instances as well as check out the [available configuration options of node-rdkafka](https://github.com/edenhill/librdkafka/blob/2213fb29f98a7a73f22da21ef85e0783f6fd67c4/CONFIGURATION.md) | ||
### Initialize kafka-avro | ||
Initialize kafka-avro | ||
```js | ||
@@ -67,3 +76,3 @@ var KafkaAvro = require('kafka-avro'); | ||
var partition = -1; | ||
producer.produce(topicName, topic, partition, value, key); | ||
producer.produce(topic, partition, value, key); | ||
}); | ||
@@ -86,2 +95,4 @@ | ||
> NOTICE: You need to initialize kafka-avro before you can produce or consume messages. | ||
```js | ||
@@ -106,15 +117,10 @@ var Transform = require('stream').Transform; | ||
stream | ||
.pipe(new Transform({ | ||
objectMode: true, | ||
transform: function(data, encoding, callback) { | ||
// do your async stuff, then: | ||
callback(null, data.message); | ||
} | ||
})) | ||
.pipe(process.stdout); | ||
consumer.on('error', function(err) { | ||
console.log(err); | ||
process.exit(1); | ||
}); | ||
stream.on('data', function(message) { | ||
console.log('Received message:', message); | ||
}); | ||
``` | ||
@@ -127,2 +133,14 @@ | ||
#### Consumer Data Object | ||
kafka-avro intercepts all incoming messages and augments the object with one more property named `parsed` which contained the avro deserialized object. Here is a breakdown of the properties included in the `message` object you receive when consuming messages: | ||
* `value` **Buffer** The raw message buffer from Kafka. | ||
* `size` **Number** The size of the message. | ||
* `key` **String|Number** Partioning key used. | ||
* `topic` **String** The topic this message comes from. | ||
* `offset` **Number** The Kafka offset. | ||
* `partition` **Number** The kafka partion used. | ||
* `parsed` **Object** The avro deserialized message as a JS Object ltieral. | ||
## Releasing | ||
@@ -138,2 +156,4 @@ | ||
- **v0.1.0**, *26 Jan 2016* | ||
- First fully working release. | ||
- **v0.0.1**, *25 Jan 2016* | ||
@@ -145,1 +165,4 @@ - Big Bang | ||
Copyright Waldo, Inc. All rights reserved. | ||
[avsc]: https://github.com/mtth/avsc | ||
[node-rdkafka]: https://github.com/Blizzard/node-rdkafka |
@@ -5,2 +5,3 @@ /* | ||
var axios = require('axios'); | ||
var Promise = require('bluebird'); | ||
@@ -11,10 +12,8 @@ var schemaFix = require('../fixtures/schema.fix'); | ||
testLib.KAFKA_SCHEMA_REGISTRY_URL = 'http://schema-registry-confluent.internal.dev.waldo.photos'; | ||
testLib.KAFKA_BROKER_URL = 'broker-1.service.consul:9092,broker-3.service.consul:9092,broker-2.service.consul:9092'; | ||
testLib.KAFKA_SCHEMA_REGISTRY_URL = 'http://localhost:8081'; | ||
testLib.KAFKA_BROKER_URL = 'localhost:9092'; | ||
testLib.topic = schemaFix.name; | ||
testLib.topicTwo = schemaFix.name + 'Two'; | ||
// curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ | ||
// --data '{"schema": "{\"name\": \"string\", \"long\": \"long\"}"}' \ | ||
// http://schema-registry-confluent.internal.dev.waldo.photos/subjects/test-thanpolas-Kafka/versions | ||
var testBoot = false; | ||
@@ -33,30 +32,37 @@ | ||
beforeEach(function() { | ||
var schemaCreateUrl = testLib.KAFKA_SCHEMA_REGISTRY_URL + | ||
'/subjects/' + testLib.topic + '-value/versions'; | ||
var data = { | ||
schema: JSON.stringify(schemaFix), | ||
}; | ||
console.log('DATA:', data); | ||
return axios({ | ||
url: schemaCreateUrl, | ||
method: 'post', | ||
headers: { | ||
'Content-Type': 'application/vnd.schemaregistry.v1+json', | ||
}, | ||
data: data, | ||
}) | ||
.catch(function(err) { | ||
console.error('Axios SR creation failed:', err); | ||
throw err; | ||
}); | ||
return Promise.all([ | ||
testLib.registerSchema(testLib.topic, schemaFix), | ||
testLib.registerSchema(testLib.topicTwo, schemaFix), | ||
]); | ||
}); | ||
// # Register a new version of a schema under the subject "Kafka-value" | ||
// $ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ | ||
// --data '{"schema": "{\"type\": \"string\"}"}' \ | ||
// http://localhost:8081/subjects/Kafka-value/versions | ||
// {"id":1} | ||
}; | ||
/** | ||
* Register a schema on SR. | ||
* | ||
* @param {string} The topic. | ||
* @param {schema} Object The schema to register. | ||
* @return {Promise} A Promise. | ||
*/ | ||
testLib.registerSchema = Promise.method(function(topic, schema) { | ||
var schemaCreateUrl = testLib.KAFKA_SCHEMA_REGISTRY_URL + | ||
'/subjects/' + topic + '-value/versions'; | ||
var data = { | ||
schema: JSON.stringify(schema), | ||
}; | ||
return axios({ | ||
url: schemaCreateUrl, | ||
method: 'post', | ||
headers: { | ||
'Content-Type': 'application/vnd.schemaregistry.v1+json', | ||
}, | ||
data: data, | ||
}) | ||
.catch(function(err) { | ||
console.error('Axios SR creation failed:', err); | ||
throw err; | ||
}); | ||
}); | ||
/** @type {Object} simple logger */ | ||
@@ -63,0 +69,0 @@ testLib.log = { |
@@ -10,2 +10,3 @@ /** | ||
describe('Produce', function() { | ||
@@ -24,17 +25,6 @@ testLib.init(); | ||
beforeEach(function() { | ||
return this.kafkaAvro.getConsumer({ | ||
'group.id': 'kafka-avro-test', | ||
'socket.keepalive.enable': true, | ||
'enable.auto.commit': true, | ||
}) | ||
.then((consumer) => { | ||
this.consumer = consumer; | ||
}); | ||
}); | ||
beforeEach(function() { | ||
return this.kafkaAvro.getProducer({ | ||
'dr_cb': true, | ||
}) | ||
.then((producer) => { | ||
.then(function (producer) { | ||
this.producer = producer; | ||
@@ -51,6 +41,5 @@ | ||
producer.on('delivery-report', function(report) { | ||
console.log('delivery-report:' + JSON.stringify(report)); | ||
producer.on('delivery-report', function() { | ||
this.gotReceipt = true; | ||
}); | ||
}.bind(this)); | ||
@@ -61,7 +50,11 @@ this.producerTopic = producer.Topic(testLib.topic, { | ||
}); | ||
}); | ||
}.bind(this)); | ||
}); | ||
afterEach(function(done) { | ||
this.producer.disconnect(function(err) { | ||
done(err); | ||
}); | ||
}); | ||
it.only('should produce and consume a message', function(done) { | ||
console.log('test start'); | ||
it('should produce a message', function(done) { | ||
var message = { | ||
@@ -71,36 +64,4 @@ name: 'Thanasis', | ||
}; | ||
this.producer.produce(this.producerTopic, -1, message, 'key'); | ||
var stream = this.consumer.getReadStream(testLib.topic, { | ||
waitInterval: 0 | ||
}); | ||
console.log('stream:', typeof stream.pipe); | ||
stream.on('error', function(err) { | ||
console.error('FATAL Stream error:', err); | ||
done(err); | ||
}); | ||
this.consumer.on('error', function(err) { | ||
console.error('Consumer Error:', err); | ||
}); | ||
// stream | ||
// .pipe((data) => { | ||
// console.log('GOT:', data); | ||
// expect(data).to.deep.equal(message); | ||
// this.consumer.disconnect(); | ||
// done(); | ||
// }); | ||
stream.on('data', (data) => { | ||
console.log('GOT:', data); | ||
expect(data).to.deep.equal(message); | ||
this.consumer.disconnect(); | ||
done(); | ||
}); | ||
console.log('producing...'); | ||
// Produce message | ||
this.producer.produce(testLib.topic, this.producerTopic, -1, message, 'key'); | ||
//need to keep polling for a while to ensure the delivery reports are received | ||
@@ -111,7 +72,26 @@ var pollLoop = setInterval(() => { | ||
clearInterval(pollLoop); | ||
this.producer.disconnect(); | ||
done(); | ||
} | ||
}, 1000); | ||
}); | ||
it('should not allow invalid type', function() { | ||
var message = { | ||
name: 'Thanasis', | ||
long: '540', | ||
}; | ||
var binded = this.producer.produce.bind(this.producer, this.producerTopic, -1, message, 'key'); | ||
expect(binded).to.throw(Error); | ||
}); | ||
it('should not allow less attributes', function() { | ||
var message = { | ||
name: 'Thanasis', | ||
}; | ||
var binded = this.producer.produce.bind(this.producer, this.producerTopic, -1, message, 'key'); | ||
expect(binded).to.throw(Error); | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
30370
19
691
161