kafka-avro
Advanced tools
Comparing version 0.1.3 to 0.2.0
@@ -26,2 +26,4 @@ /* | ||
function noop() {} | ||
/** | ||
@@ -34,2 +36,3 @@ * @fileOverview bootstrap and master exporing module. | ||
* | ||
* @param {Object} opts The options. | ||
* @constructor | ||
@@ -44,2 +47,10 @@ */ | ||
/** @type {boolean} Indicates if we se/deserialize with MAGIC BYTE */ | ||
this.hasMagicByte = !!opts.hasMagicByte; | ||
/** @type {Array.<node-rdkafka.Producer>} Instanciated producers. */ | ||
this._producers = []; | ||
/** @type {Array.<node-rdkafka.Consumer>} Instanciated consumers. */ | ||
this._consumers = []; | ||
/** | ||
@@ -60,2 +71,15 @@ * A dict containing all the value schemas with key the bare topic name and | ||
this.keySchemas = {}; | ||
/** | ||
* A dict containing all the value schemas metadata, with key the bare | ||
* topic name and value the SR response on that topic: | ||
* | ||
* 'subject' {string} The full topic name, including the '-value' suffix. | ||
* 'version' {number} The version number of the schema. | ||
* 'id' {number} The schema id. | ||
* 'schema' {string} JSON serialized schema. | ||
* | ||
* @type {Object} | ||
*/ | ||
this.schemaMeta = {}; | ||
}); | ||
@@ -89,2 +113,27 @@ | ||
/** | ||
* Dispose the method. | ||
* | ||
* @return {Promise} A Promise. | ||
*/ | ||
KafkaAvro.prototype.dispose = Promise.method(function () { | ||
var disconnectPromises = []; | ||
this._consumers.forEach(function(consumer) { | ||
var discon = Promise.promisify(consumer.disconnect.bind(consumer)); | ||
var disconProm = discon().catch(noop); | ||
disconnectPromises.push(disconProm); | ||
}); | ||
this._producers.forEach(function(producer) { | ||
var discon = Promise.promisify(producer.disconnect.bind(producer)); | ||
var disconProm = discon().catch(noop); | ||
disconnectPromises.push(disconProm); | ||
}); | ||
this.hasMagicByte = false; | ||
return Promise.all(disconnectPromises); | ||
}); | ||
/** | ||
* Fetch all registered schema topics from SR. | ||
@@ -125,2 +174,3 @@ * | ||
return { | ||
responseRaw: response.data, | ||
schemaType: schemaType, | ||
@@ -149,6 +199,9 @@ schemaTopicRaw: schemaTopic, | ||
ex.message); | ||
return schemaObj; | ||
} | ||
if (schemaObj.schemaType.toLowerCase() === 'value') { | ||
this.valueSchemas[schemaObj.topic] = schemaObj.type; | ||
this.schemaMeta[schemaObj.topic] = schemaObj.responseRaw; | ||
} else { | ||
@@ -155,0 +208,0 @@ this.keySchemas[schemaObj.topic] = schemaObj.type; |
@@ -10,2 +10,4 @@ /** | ||
var magicByte = require('./magic-byte'); | ||
/** | ||
@@ -34,2 +36,4 @@ * Wrapper for node-rdkafka Consumer Ctor, a mixin. | ||
this._consumers.push(consumer); | ||
consumer.on('event.log', function(log) { | ||
@@ -74,2 +78,6 @@ console.log('node-rdkafka log:', log); | ||
stream.on('error', function(err) { | ||
console.error('KafkaAvro.Consumer :: Read Stream Error:', err); | ||
}); | ||
return stream | ||
@@ -106,3 +114,8 @@ .pipe(new Transform({ | ||
message.parsed = this.valueSchemas[message.topic].fromBuffer(message.value); | ||
var type = this.valueSchemas[message.topic]; | ||
if (this.hasMagicByte) { | ||
message.parsed = magicByte.fromMessageBuffer(type, message.value).value; | ||
} else { | ||
message.parsed = type.fromBuffer(message.value); | ||
} | ||
@@ -113,3 +126,11 @@ cb(message); | ||
/** | ||
* Callback for Stream Transform, will deserialize the message properly. | ||
* | ||
* @param {string} topicName The topic name. | ||
* @param {*} data The message to encode. | ||
* @param {string=} encoding Encoding of the message. | ||
* @param {Function} callback Callback to call when done. | ||
* @private | ||
*/ | ||
Consumer.prototype._transformAvro = function (topicName, data, encoding, callback) { | ||
@@ -124,4 +145,12 @@ if (!this.valueSchemas[topicName]) { | ||
} | ||
data.parsed = this.valueSchemas[topicName].fromBuffer(data.value); | ||
var type = this.valueSchemas[topicName]; | ||
console.log('consuming STREAM topic:', data.topic, 'has byte:', this.hasMagicByte); | ||
if (this.hasMagicByte) { | ||
data.parsed = magicByte.fromMessageBuffer(type, data.value).value; | ||
} else { | ||
data.parsed = type.fromBuffer(data.value); | ||
} | ||
callback(null, data); | ||
}; |
@@ -8,2 +8,4 @@ /** | ||
var magicByte = require('./magic-byte'); | ||
/** | ||
@@ -34,2 +36,4 @@ * Wrapper for node-rdkafka Produce Ctor, a mixin. | ||
this._producers.push(producer); | ||
// hack node-rdkafka | ||
@@ -56,5 +60,6 @@ producer.__kafkaAvro_produce = producer.produce; | ||
* @param {string|number} key The partioning key. | ||
* @param {*=} optOpaque Pass vars to receipt handler. | ||
*/ | ||
Producer.prototype._produceWrapper = function (producerInstance, kafkaTopic, | ||
partition, value, key) { | ||
partition, value, key, optOpaque) { | ||
@@ -67,8 +72,19 @@ var topicName = kafkaTopic.name(); | ||
var bufVal = new Buffer(JSON.stringify(value)); | ||
return producerInstance.__kafkaAvro_produce(kafkaTopic, partition, bufVal, key); | ||
return producerInstance.__kafkaAvro_produce(kafkaTopic, partition, bufVal, | ||
key, optOpaque); | ||
} | ||
var bufValue = this.valueSchemas[topicName].toBuffer(value); | ||
var type = this.valueSchemas[topicName]; | ||
var schemaId = this.schemaMeta[topicName].id; | ||
return producerInstance.__kafkaAvro_produce(kafkaTopic, partition, bufValue, key); | ||
var bufValue; | ||
if (this.hasMagicByte) { | ||
bufValue = magicByte.toMessageBuffer(value, type, schemaId); | ||
} else { | ||
bufValue = type.toBuffer(value); | ||
} | ||
return producerInstance.__kafkaAvro_produce(kafkaTopic, partition, bufValue, | ||
key, optOpaque); | ||
}; |
{ | ||
"name": "kafka-avro", | ||
"version": "0.1.3", | ||
"version": "0.2.0", | ||
"main": "./lib/kafka-avro", | ||
@@ -24,3 +24,3 @@ "description": "Node.js bindings for librdkafka with Avro schema serialization.", | ||
"scripts": { | ||
"test": "mocha -b test/spec && eslint lib/**" | ||
"test": "eslint lib && mocha -b test/spec && eslint lib/**" | ||
}, | ||
@@ -27,0 +27,0 @@ "dependencies": { |
@@ -5,2 +5,4 @@ # kafka-avro | ||
[![CircleCI](https://circleci.com/gh/waldophotos/kafka-avro/tree/master.svg?style=svg)](https://circleci.com/gh/waldophotos/kafka-avro/tree/master) | ||
The kafka-avro library is a wrapper that combines the [node-rdkafka][node-rdkafka] and [avsc](avsc) libraries to allow for Production and Consumption of messages on kafka validated and serialized by Avro. | ||
@@ -46,2 +48,3 @@ | ||
schemaRegistry: 'localhost:8081', | ||
hasMagicByte: 'true', | ||
}); | ||
@@ -57,2 +60,10 @@ | ||
### Kafka-avro options | ||
When instantiating kafka-avro you may pass the following options: | ||
* `kafkaBroker` **String REQUIRED** The url or comma delimited strings pointing to your kafka brokers. | ||
* `schemaRegistry` **String REQUIRED** The url to the Schema Registry. | ||
* `hasMagicByte` **Boolean** *Default*: `false` Enable this for Confluence Schema Registry Magic Byte insertion. | ||
### Producer | ||
@@ -62,3 +73,3 @@ | ||
By inoking the `kafkaAvro.getProducer()` method, kafka-avro will instantiate a Producer, make it connect and wait for it to be ready before the promise is resolved. | ||
By invoking the `kafkaAvro.getProducer()` method, kafka-avro will instantiate a Producer, make it connect and wait for it to be ready before the promise is resolved. | ||
@@ -181,2 +192,4 @@ ```js | ||
- **v0.2.0**, *30 Jan 2016* | ||
- Added Confluent's Magic Byte support when encoding and decoding messages. | ||
- **v0.1.2**, *27 Jan 2016* | ||
@@ -183,0 +196,0 @@ - Suppress schema parsing errors. |
@@ -24,8 +24,8 @@ /* | ||
testLib.init = function() { | ||
if (testBoot) { | ||
return; | ||
} | ||
testBoot = true; | ||
beforeEach(function() { | ||
if (testBoot) { | ||
return; | ||
} | ||
testBoot = true; | ||
beforeEach(function() { | ||
return Promise.all([ | ||
@@ -52,2 +52,5 @@ testLib.registerSchema(testLib.topic, schemaFix), | ||
}; | ||
console.log('TEST :: Registering schema:', topic, 'on SR:', schemaCreateUrl); | ||
return axios({ | ||
@@ -54,0 +57,0 @@ url: schemaCreateUrl, |
/** | ||
* @fileOverview Test produce and consume messages using kafka-avro. | ||
*/ | ||
var crypto = require('crypto'); | ||
@@ -11,2 +12,3 @@ var chai = require('chai'); | ||
function noop () {} | ||
@@ -27,4 +29,5 @@ describe('Consume', function() { | ||
this.consOpts = { | ||
'group.id': 'testKafkaAvro', | ||
'group.id': 'testKafkaAvro' + crypto.randomBytes(20).toString('hex'), | ||
'enable.auto.commit': true, | ||
// 'auto.offset.reset': 'earliest', | ||
// 'session.timeout.ms': 1000, | ||
@@ -36,2 +39,5 @@ }; | ||
this.consumer = consumer; | ||
this.consumer.on('error', function(err) { | ||
console.log('consumerError:', err); | ||
}); | ||
}); | ||
@@ -74,9 +80,7 @@ }); | ||
afterEach(function() { | ||
return this.kafkaAvro.dispose(); | ||
}); | ||
describe('Consumer direct "on"', function() { | ||
afterEach(function(done) { | ||
this.consumer.disconnect(function() { | ||
done(); | ||
}); | ||
}); | ||
it('should produce and consume a message using consume "on"', function(done) { | ||
@@ -110,12 +114,12 @@ var produceTime = 0; | ||
this.producer.produce(this.producerTopic, -1, message, 'key'); | ||
}, 2000); | ||
}, 4000); | ||
//need to keep polling for a while to ensure the delivery reports are received | ||
var pollLoop = setInterval(function () { | ||
this.producer.poll(); | ||
if (this.gotReceipt) { | ||
clearInterval(pollLoop); | ||
this.producer.disconnect(); | ||
} | ||
}.bind(this), 1000); | ||
// //need to keep polling for a while to ensure the delivery reports are received | ||
// var pollLoop = setInterval(function () { | ||
// this.producer.poll(); | ||
// if (this.gotReceipt) { | ||
// clearInterval(pollLoop); | ||
// this.producer.disconnect(); | ||
// } | ||
// }.bind(this), 1000); | ||
}); | ||
@@ -141,3 +145,2 @@ | ||
this.consumer.on('data', function(rawData) { | ||
console.log('GOT:', rawData); | ||
if (rawData.topic === testLib.topic) { | ||
@@ -169,13 +172,3 @@ receivedOne = true; | ||
}, 2000); | ||
//need to keep polling for a while to ensure the delivery reports are received | ||
var pollLoop = setInterval(function () { | ||
this.producer.poll(); | ||
if (this.gotReceipt) { | ||
clearInterval(pollLoop); | ||
this.producer.disconnect(); | ||
} | ||
}.bind(this), 1000); | ||
}); | ||
}); | ||
@@ -195,2 +188,3 @@ | ||
}); | ||
stream.on('error', noop); | ||
@@ -212,3 +206,2 @@ stream.on('data', function(dataRaw) { | ||
setTimeout(() => { | ||
@@ -218,12 +211,4 @@ produceTime = Date.now(); | ||
}, 2000); | ||
//need to keep polling for a while to ensure the delivery reports are received | ||
var pollLoop = setInterval(function () { | ||
this.producer.poll(); | ||
if (this.gotReceipt) { | ||
clearInterval(pollLoop); | ||
this.producer.disconnect(); | ||
} | ||
}.bind(this), 1000); | ||
}); | ||
}); | ||
}); |
@@ -10,3 +10,2 @@ /** | ||
describe('Produce', function() { | ||
@@ -78,3 +77,4 @@ testLib.init(); | ||
var binded = this.producer.produce.bind(this.producer, this.producerTopic, -1, message, 'key'); | ||
var binded = this.producer.produce.bind(this.producer, this.producerTopic, | ||
-1, message, 'key'); | ||
@@ -81,0 +81,0 @@ expect(binded).to.throw(Error); |
@@ -9,5 +9,6 @@ /** | ||
var srUrl = 'http://schema-registry-confluent.internal.dev.waldo.photos'; | ||
// var srUrl = 'http://schema-registry-confluent.internal.dev.waldo.photos'; | ||
var srUrl = 'http://localhost:8081'; | ||
describe('Initialization of RS', function() { | ||
describe('Initialization of SR', function() { | ||
it('should initialize properly', function() { | ||
@@ -21,2 +22,3 @@ var kafkaAvro = new KafkaAvro({ | ||
expect(res).to.have.keys([ | ||
'responseRaw', | ||
'schemaType', | ||
@@ -28,2 +30,9 @@ 'topic', | ||
]); | ||
expect(res.responseRaw).to.have.keys([ | ||
'subject', | ||
'version', | ||
'id', | ||
'schema', | ||
]); | ||
}) | ||
@@ -34,2 +43,29 @@ .then((all) => { | ||
}); | ||
it('kafkaAvro instance should contain expected values after init', function() { | ||
var kafkaAvro = new KafkaAvro({ | ||
schemaRegistry: srUrl, | ||
}); | ||
return kafkaAvro.init() | ||
.map((res) => { | ||
if (res.schemaType.toLowerCase() === 'value') { | ||
expect(kafkaAvro.valueSchemas[res.topic]).to.be.an('object'); | ||
expect(kafkaAvro.schemaMeta[res.topic]).to.be.an('object'); | ||
expect(kafkaAvro.schemaMeta[res.topic]).to.have.keys([ | ||
'subject', | ||
'version', | ||
'id', | ||
'schema', | ||
]); | ||
} else { | ||
expect(kafkaAvro.keySchemas[res.topic]).to.be.an('object'); | ||
} | ||
}) | ||
.then((all) => { | ||
expect(all).to.have.length.of.at.least(1); | ||
}); | ||
}); | ||
}); |
43123
21
1015
208