kafka-avro
Advanced tools
Comparing version 0.1.0 to 0.1.1
@@ -16,2 +16,3 @@ /* | ||
var avro = require('avsc'); | ||
var Kafka = require('node-rdkafka'); | ||
@@ -59,2 +60,9 @@ // | ||
/** | ||
* Expose the node-rdkafka library's CODES constants. | ||
* | ||
* @type {Object} | ||
*/ | ||
KafkaAvro.CODES = Kafka.CODES; | ||
// | ||
@@ -61,0 +69,0 @@ // Add Mixins |
{ | ||
"name": "kafka-avro", | ||
"version": "0.1.0", | ||
"version": "0.1.1", | ||
"main": "./lib/kafka-avro", | ||
@@ -5,0 +5,0 @@ "description": "Node.js bindings for librdkafka with Avro schema serialization.", |
128
README.md
@@ -25,4 +25,14 @@ # kafka-avro | ||
You are highly encouraged to read the ["node-rdkafka" documentation](https://blizzard.github.io/node-rdkafka/current/) on the way you can use the Producer and Consumer instances as well as check out the [available configuration options of node-rdkafka](https://github.com/edenhill/librdkafka/blob/2213fb29f98a7a73f22da21ef85e0783f6fd67c4/CONFIGURATION.md) | ||
You are highly encouraged to read the ["node-rdkafka" documentation](https://blizzard.github.io/node-rdkafka/current/), as it explains how you can use the Producer and Consumer instances as well as check out the [available configuration options of node-rdkafka](https://github.com/edenhill/librdkafka/blob/2213fb29f98a7a73f22da21ef85e0783f6fd67c4/CONFIGURATION.md). | ||
### node-rdkafka CODES | ||
The `Kafka.CODES` enumeration of constant values provided by the "node-rdkafka" library is also available as a static var at: | ||
```js | ||
var KafkaAvro = require('kafka-avro'); | ||
console.log(KafkaAvro.CODES); | ||
``` | ||
### Initialize kafka-avro | ||
@@ -38,6 +48,2 @@ | ||
kafkaAvro.on('log', function(message) { | ||
console.log(message); | ||
}) | ||
// Query the Schema Registry for all topic-schema's | ||
@@ -51,36 +57,34 @@ // fetch them and evaluate them. | ||
### Quick Usage Producer | ||
### Producer | ||
> NOTICE: You need to initialize kafka-avro before you can produce or consume messages. | ||
By inoking the `kafkaAvro.getProducer()` method, kafka-avro will instantiate a Producer, make it connect and wait for it to be ready before the promise is resolved. | ||
```js | ||
var producer = kafkaAvro.getProducer({ | ||
kafkaAvro.getProducer({ | ||
// Options listed bellow | ||
}); | ||
}) | ||
// "getProducer()" returns a Bluebird Promise. | ||
.then(function(producer) { | ||
var topicName = 'test'; | ||
var topicName = 'test'; | ||
producer.on('disconnected', function(arg) { | ||
console.log('producer disconnected. ' + JSON.stringify(arg)); | ||
}); | ||
//Wait for the ready event before producing | ||
producer.on('ready', function() { | ||
//Create a Topic object with any options our Producer | ||
//should use when producing to that topic. | ||
var topic = producer.Topic(topicName, { | ||
// Make the Kafka broker acknowledge our message (optional) | ||
'request.required.acks': 1 | ||
}); | ||
//Create a Topic object with any options our Producer | ||
//should use when producing to that topic. | ||
var topic = producer.Topic(topicName, { | ||
// Make the Kafka broker acknowledge our message (optional) | ||
'request.required.acks': 1 | ||
}); | ||
var value = new Buffer('value-' +i); | ||
var key = 'key'; | ||
var value = new Buffer('value-' +i); | ||
var key = 'key'; | ||
// if partition is set to -1, librdkafka will use the default partitioner | ||
var partition = -1; | ||
producer.produce(topic, partition, value, key); | ||
}); | ||
producer.on('disconnected', function(arg) { | ||
console.log('producer disconnected. ' + JSON.stringify(arg)); | ||
}); | ||
//starting the producer | ||
producer.connect(); | ||
// if partition is set to -1, librdkafka will use the default partitioner | ||
var partition = -1; | ||
producer.produce(topic, partition, value, key); | ||
}) | ||
``` | ||
@@ -93,33 +97,56 @@ | ||
### Quick Usage Consumer | ||
### Consumer | ||
> NOTICE: You need to initialize kafka-avro before you can produce or consume messages. | ||
By inoking the `kafkaAvro.getConsumer()` method, kafka-avro will instantiate a Consumer, make it connect and wait for it to be ready before the promise is resolved. | ||
#### Consumer using events to consume | ||
```js | ||
var Transform = require('stream').Transform; | ||
kafkaAvro.getConsumer({ | ||
'group.id': 'librd-test', | ||
'socket.keepalive.enable': true, | ||
'enable.auto.commit': true, | ||
}) | ||
// the "getConsumer()" method will return a bluebird promise. | ||
.then(function(consumer) { | ||
var topicName = 'test'; | ||
this.consumer.consume([topicName]); | ||
this.consumer.on('data', function(rawData) { | ||
console.log('data:', rawData); | ||
}); | ||
}); | ||
``` | ||
var consumer = kafkaAvro.getConsumer({ | ||
#### Consumer using streams to consume | ||
```js | ||
kafkaAvro.getConsumer({ | ||
'group.id': 'librd-test', | ||
'socket.keepalive.enable': true, | ||
'enable.auto.commit': true, | ||
}); | ||
}) | ||
// the "getConsumer()" method will return a bluebird promise. | ||
.then(function(consumer) { | ||
var topicName = 'test'; | ||
var topicName = 'test'; | ||
var stream = consumer.getReadStream(topicName, { | ||
waitInterval: 0 | ||
}); | ||
var stream = consumer.getReadStream(topicName, { | ||
waitInterval: 0 | ||
}); | ||
stream.on('error', function() { | ||
process.exit(1); | ||
}); | ||
stream.on('error', function() { | ||
process.exit(1); | ||
}); | ||
consumer.on('error', function(err) { | ||
console.log(err); | ||
process.exit(1); | ||
}); | ||
consumer.on('error', function(err) { | ||
console.log(err); | ||
process.exit(1); | ||
}); | ||
stream.on('data', function(message) { | ||
console.log('Received message:', message); | ||
}); | ||
stream.on('data', function(message) { | ||
console.log('Received message:', message); | ||
}); | ||
}); | ||
``` | ||
@@ -154,2 +181,5 @@ | ||
- **v0.1.1**, *27 Jan 2016* | ||
- Expose CODES enum from node-rdkafka. | ||
- Write more docs, add the event based consuming method. | ||
- **v0.1.0**, *26 Jan 2016* | ||
@@ -156,0 +186,0 @@ - First fully working release. |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
31799
698
191