kafka-avro
Advanced tools
Comparing version 0.0.1 to 0.1.0
@@ -24,3 +24,3 @@ /** | ||
*/ | ||
Consumer.prototype.getConsumer = Promise.method(function (opts) { | ||
Consumer.prototype.getConsumer = Promise.method(function (topicName, opts) { | ||
if (!opts['metadata.broker.list']) { | ||
@@ -30,4 +30,10 @@ opts['metadata.broker.list'] = this.kafkaBrokerUrl; | ||
console.log('KafkaAvro :: Starting Consumer with opts:', opts); | ||
var consumer = new kafka.KafkaConsumer(opts); | ||
consumer.on('event.log', function(log) { | ||
console.log('node-rdkafka log:', log); | ||
}); | ||
// hack node-rdkafka | ||
@@ -37,3 +43,12 @@ consumer.__kafkaAvro_getReadStream = consumer.getReadStream; | ||
return consumer; | ||
consumer.__kafkaAvro_on = consumer.on; | ||
consumer.on = this._onWrapper.bind(this, consumer); | ||
return new Promise(function(resolve) { | ||
consumer.on('ready', function() { | ||
resolve(consumer); | ||
}); | ||
consumer.connect(); | ||
}); | ||
}); | ||
@@ -63,17 +78,48 @@ | ||
objectMode: true, | ||
transform: function(data, encoding, callback) { | ||
var value; | ||
if (!this.valueSchemas[topic]) { | ||
console.log('KafkaAvro :: Warning, consumer did not find topic on SR:', | ||
topic); | ||
value = JSON.parse(data.toString('utf-8')); | ||
callback(null, value); | ||
return; | ||
} | ||
value = this.valueSchemas[topic].fromBuffer(data.message); | ||
// We want to force it to run async | ||
callback(null, value); | ||
} | ||
transform: this._transformAvro.bind(this, topic), | ||
})); | ||
}; | ||
/** | ||
* The node-rdkafka on method wrapper, will intercept "data" events and | ||
* deserialize the incoming message using the existing schemas. | ||
* | ||
* @param {kafka.KafkaConsumer} consumerInstance node-rdkafka instance. | ||
* @param {string} eventName the name to listen for events on. | ||
* @param {Function} cb Event callback. | ||
* @private | ||
*/ | ||
Consumer.prototype._onWrapper = function (consumerInstance, eventName, cb) { | ||
if (eventName !== 'data') { | ||
return consumerInstance.__kafkaAvro_on(eventName, cb); | ||
} | ||
consumerInstance.__kafkaAvro_on('data', function(message) { | ||
if (!this.valueSchemas[message.topic]) { | ||
console.log('KafkaAvro :: Warning, consumer did not find topic on SR:', | ||
message.topic); | ||
message.parsed = JSON.parse(message.toString('utf-8')); | ||
cb(message); | ||
return; | ||
} | ||
message.parsed = this.valueSchemas[message.topic].fromBuffer(message.value); | ||
cb(message); | ||
}.bind(this)); | ||
}; | ||
Consumer.prototype._transformAvro = function (topicName, data, encoding, callback) { | ||
if (!this.valueSchemas[topicName]) { | ||
console.log('KafkaAvro :: Warning, consumer did not find topic on SR:', | ||
topicName); | ||
data.parsed = JSON.parse(data.toString('utf-8')); | ||
callback(null, data); | ||
return; | ||
} | ||
data.parsed = this.valueSchemas[topicName].fromBuffer(data.value); | ||
callback(null, data); | ||
}; |
@@ -50,3 +50,2 @@ /** | ||
* @param {kafka.Producer} producerInstance node-rdkafka instance. | ||
* @param {string} topic Topic to produce on. | ||
* @param {kafka.Producer.Topic} kafkaTopic node-rdkafka Topic instance. | ||
@@ -57,14 +56,17 @@ * @param {number} partition The partition to produce on. | ||
*/ | ||
Producer.prototype._produceWrapper = function (producerInstance, topic, kafkaTopic, | ||
Producer.prototype._produceWrapper = function (producerInstance, kafkaTopic, | ||
partition, value, key) { | ||
if (!this.valueSchemas[topic]) { | ||
var topicName = kafkaTopic.name(); | ||
if (!this.valueSchemas[topicName]) { | ||
// topic not found in schemas, bail early | ||
console.log('KafkaAvro :: Warning, did not find topic on SR:', topic); | ||
console.log('KafkaAvro :: Warning, did not find topic on SR:', topicName); | ||
var bufVal = new Buffer(JSON.stringify(value)); | ||
return producerInstance.__kafkaAvro_produce(topic, partition, bufVal, key); | ||
return producerInstance.__kafkaAvro_produce(kafkaTopic, partition, bufVal, key); | ||
} | ||
var bufValue = this.valueSchemas[topic].toBuffer(value); | ||
return producerInstance.__kafkaAvro_produce(topic, partition, bufValue, key); | ||
var bufValue = this.valueSchemas[topicName].toBuffer(value); | ||
return producerInstance.__kafkaAvro_produce(kafkaTopic, partition, bufValue, key); | ||
}; |
{ | ||
"name": "kafka-avro", | ||
"version": "0.0.1", | ||
"version": "0.1.0", | ||
"main": "./lib/kafka-avro", | ||
@@ -5,0 +5,0 @@ "description": "Node.js bindings for librdkafka with Avro schema serialization.", |
@@ -5,3 +5,3 @@ # kafka-avro | ||
**WARNING** Still WIP, consumer not working! | ||
The kafka-avro library is a wrapper that combines the [node-rdkafka][node-rdkafka] and [avsc](avsc) libraries to allow for Production and Consumption of messages on kafka validated and serialized by Avro. | ||
@@ -18,5 +18,14 @@ ## Install | ||
The kafka-avro library operates in the following steps: | ||
1. You provide your Kafka Brokers and Schema Registry (SR) Url to a new instance of kafka-avro. | ||
1. You initialize kafka-avro, that will tell the library to query the SR for all registered schemas, evaluate and store them in runtime memory. | ||
1. kafka-avro will then expose the `getConsumer()` and `getProducer()` methods, which both return instsances of the corresponding Constructors from the [node-rdkafka][node-rdkafka] library. | ||
The instances of "node-rdkafka" that are returned by kafka-avro are hacked so as to intercept produced and consumed messages and run them by the Avro de/serializer. | ||
You are highly encouraged to read the ["node-rdkafka" documentation](https://blizzard.github.io/node-rdkafka/current/) on the way you can use the Producer and Consumer instances as well as check out the [available configuration options of node-rdkafka](https://github.com/edenhill/librdkafka/blob/2213fb29f98a7a73f22da21ef85e0783f6fd67c4/CONFIGURATION.md) | ||
### Initialize kafka-avro | ||
Initialize kafka-avro | ||
```js | ||
@@ -67,3 +76,3 @@ var KafkaAvro = require('kafka-avro'); | ||
var partition = -1; | ||
producer.produce(topicName, topic, partition, value, key); | ||
producer.produce(topic, partition, value, key); | ||
}); | ||
@@ -86,2 +95,4 @@ | ||
> NOTICE: You need to initialize kafka-avro before you can produce or consume messages. | ||
```js | ||
@@ -106,15 +117,10 @@ var Transform = require('stream').Transform; | ||
stream | ||
.pipe(new Transform({ | ||
objectMode: true, | ||
transform: function(data, encoding, callback) { | ||
// do your async stuff, then: | ||
callback(null, data.message); | ||
} | ||
})) | ||
.pipe(process.stdout); | ||
consumer.on('error', function(err) { | ||
console.log(err); | ||
process.exit(1); | ||
}); | ||
stream.on('data', function(message) { | ||
console.log('Received message:', message); | ||
}); | ||
``` | ||
@@ -127,2 +133,14 @@ | ||
#### Consumer Data Object | ||
kafka-avro intercepts all incoming messages and augments the object with one more property named `parsed` which contained the avro deserialized object. Here is a breakdown of the properties included in the `message` object you receive when consuming messages: | ||
* `value` **Buffer** The raw message buffer from Kafka. | ||
* `size` **Number** The size of the message. | ||
* `key` **String|Number** Partioning key used. | ||
* `topic` **String** The topic this message comes from. | ||
* `offset` **Number** The Kafka offset. | ||
* `partition` **Number** The kafka partion used. | ||
* `parsed` **Object** The avro deserialized message as a JS Object ltieral. | ||
## Releasing | ||
@@ -138,2 +156,4 @@ | ||
- **v0.1.0**, *26 Jan 2016* | ||
- First fully working release. | ||
- **v0.0.1**, *25 Jan 2016* | ||
@@ -145,1 +165,4 @@ - Big Bang | ||
Copyright Waldo, Inc. All rights reserved. | ||
[avsc]: https://github.com/mtth/avsc | ||
[node-rdkafka]: https://github.com/Blizzard/node-rdkafka |
@@ -5,2 +5,3 @@ /* | ||
var axios = require('axios'); | ||
var Promise = require('bluebird'); | ||
@@ -11,10 +12,8 @@ var schemaFix = require('../fixtures/schema.fix'); | ||
testLib.KAFKA_SCHEMA_REGISTRY_URL = 'http://schema-registry-confluent.internal.dev.waldo.photos'; | ||
testLib.KAFKA_BROKER_URL = 'broker-1.service.consul:9092,broker-3.service.consul:9092,broker-2.service.consul:9092'; | ||
testLib.KAFKA_SCHEMA_REGISTRY_URL = 'http://localhost:8081'; | ||
testLib.KAFKA_BROKER_URL = 'localhost:9092'; | ||
testLib.topic = schemaFix.name; | ||
testLib.topicTwo = schemaFix.name + 'Two'; | ||
// curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ | ||
// --data '{"schema": "{\"name\": \"string\", \"long\": \"long\"}"}' \ | ||
// http://schema-registry-confluent.internal.dev.waldo.photos/subjects/test-thanpolas-Kafka/versions | ||
var testBoot = false; | ||
@@ -33,30 +32,37 @@ | ||
beforeEach(function() { | ||
var schemaCreateUrl = testLib.KAFKA_SCHEMA_REGISTRY_URL + | ||
'/subjects/' + testLib.topic + '-value/versions'; | ||
var data = { | ||
schema: JSON.stringify(schemaFix), | ||
}; | ||
console.log('DATA:', data); | ||
return axios({ | ||
url: schemaCreateUrl, | ||
method: 'post', | ||
headers: { | ||
'Content-Type': 'application/vnd.schemaregistry.v1+json', | ||
}, | ||
data: data, | ||
}) | ||
.catch(function(err) { | ||
console.error('Axios SR creation failed:', err); | ||
throw err; | ||
}); | ||
return Promise.all([ | ||
testLib.registerSchema(testLib.topic, schemaFix), | ||
testLib.registerSchema(testLib.topicTwo, schemaFix), | ||
]); | ||
}); | ||
// # Register a new version of a schema under the subject "Kafka-value" | ||
// $ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ | ||
// --data '{"schema": "{\"type\": \"string\"}"}' \ | ||
// http://localhost:8081/subjects/Kafka-value/versions | ||
// {"id":1} | ||
}; | ||
/** | ||
* Register a schema on SR. | ||
* | ||
* @param {string} The topic. | ||
* @param {schema} Object The schema to register. | ||
* @return {Promise} A Promise. | ||
*/ | ||
testLib.registerSchema = Promise.method(function(topic, schema) { | ||
var schemaCreateUrl = testLib.KAFKA_SCHEMA_REGISTRY_URL + | ||
'/subjects/' + topic + '-value/versions'; | ||
var data = { | ||
schema: JSON.stringify(schema), | ||
}; | ||
return axios({ | ||
url: schemaCreateUrl, | ||
method: 'post', | ||
headers: { | ||
'Content-Type': 'application/vnd.schemaregistry.v1+json', | ||
}, | ||
data: data, | ||
}) | ||
.catch(function(err) { | ||
console.error('Axios SR creation failed:', err); | ||
throw err; | ||
}); | ||
}); | ||
/** @type {Object} simple logger */ | ||
@@ -63,0 +69,0 @@ testLib.log = { |
@@ -10,2 +10,3 @@ /** | ||
describe('Produce', function() { | ||
@@ -24,17 +25,6 @@ testLib.init(); | ||
beforeEach(function() { | ||
return this.kafkaAvro.getConsumer({ | ||
'group.id': 'kafka-avro-test', | ||
'socket.keepalive.enable': true, | ||
'enable.auto.commit': true, | ||
}) | ||
.then((consumer) => { | ||
this.consumer = consumer; | ||
}); | ||
}); | ||
beforeEach(function() { | ||
return this.kafkaAvro.getProducer({ | ||
'dr_cb': true, | ||
}) | ||
.then((producer) => { | ||
.then(function (producer) { | ||
this.producer = producer; | ||
@@ -51,6 +41,5 @@ | ||
producer.on('delivery-report', function(report) { | ||
console.log('delivery-report:' + JSON.stringify(report)); | ||
producer.on('delivery-report', function() { | ||
this.gotReceipt = true; | ||
}); | ||
}.bind(this)); | ||
@@ -61,7 +50,11 @@ this.producerTopic = producer.Topic(testLib.topic, { | ||
}); | ||
}); | ||
}.bind(this)); | ||
}); | ||
afterEach(function(done) { | ||
this.producer.disconnect(function(err) { | ||
done(err); | ||
}); | ||
}); | ||
it.only('should produce and consume a message', function(done) { | ||
console.log('test start'); | ||
it('should produce a message', function(done) { | ||
var message = { | ||
@@ -71,36 +64,4 @@ name: 'Thanasis', | ||
}; | ||
this.producer.produce(this.producerTopic, -1, message, 'key'); | ||
var stream = this.consumer.getReadStream(testLib.topic, { | ||
waitInterval: 0 | ||
}); | ||
console.log('stream:', typeof stream.pipe); | ||
stream.on('error', function(err) { | ||
console.error('FATAL Stream error:', err); | ||
done(err); | ||
}); | ||
this.consumer.on('error', function(err) { | ||
console.error('Consumer Error:', err); | ||
}); | ||
// stream | ||
// .pipe((data) => { | ||
// console.log('GOT:', data); | ||
// expect(data).to.deep.equal(message); | ||
// this.consumer.disconnect(); | ||
// done(); | ||
// }); | ||
stream.on('data', (data) => { | ||
console.log('GOT:', data); | ||
expect(data).to.deep.equal(message); | ||
this.consumer.disconnect(); | ||
done(); | ||
}); | ||
console.log('producing...'); | ||
// Produce message | ||
this.producer.produce(testLib.topic, this.producerTopic, -1, message, 'key'); | ||
//need to keep polling for a while to ensure the delivery reports are received | ||
@@ -111,7 +72,26 @@ var pollLoop = setInterval(() => { | ||
clearInterval(pollLoop); | ||
this.producer.disconnect(); | ||
done(); | ||
} | ||
}, 1000); | ||
}); | ||
it('should not allow invalid type', function() { | ||
var message = { | ||
name: 'Thanasis', | ||
long: '540', | ||
}; | ||
var binded = this.producer.produce.bind(this.producer, this.producerTopic, -1, message, 'key'); | ||
expect(binded).to.throw(Error); | ||
}); | ||
it('should not allow less attributes', function() { | ||
var message = { | ||
name: 'Thanasis', | ||
}; | ||
var binded = this.producer.produce.bind(this.producer, this.producerTopic, -1, message, 'key'); | ||
expect(binded).to.throw(Error); | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
30370
19
691
161