@@ -5,6 +5,4 @@ // This file has been generated from coffee source files | ||
exports.FakeProzess = require('./lib/FakeProzess'); | ||
/* | ||
//@ sourceMappingURL=index.js.map | ||
*/ |
@@ -9,4 +9,2 @@ // This file has been generated from coffee source files | ||
async = require('async'); | ||
zlib = require('zlib'); | ||
@@ -16,7 +14,30 @@ | ||
async = require('async'); | ||
_ = require('underscore'); | ||
/* | ||
Decompressing messages received from Kafka | ||
Transformation stream | ||
TODO: implement snappy | ||
*/ | ||
exports.Decompressor = Decompressor = (function(_super) { | ||
__extends(Decompressor, _super); | ||
/* | ||
Constructs the decompressor. | ||
onErrorDecompressing: function(message, error, detail, next) { | ||
next({msg: error, detail: detail}); | ||
}; | ||
@param {Object} options (optional) | ||
@param {Function} options.onErrorDecompressing (optional) | ||
*/ | ||
function Decompressor(options) { | ||
@@ -26,3 +47,3 @@ options = options || {}; | ||
Decompressor.__super__.constructor.call(this, options); | ||
this.onErrorDecompressing = options.onErrorDecompressing || function(this_, message, error, detail, next) { | ||
this.onErrorDecompressing = options.onErrorDecompressing || function(message, error, detail, next) { | ||
return next({ | ||
@@ -35,2 +56,11 @@ msg: error, | ||
/* | ||
Transformation | ||
If an error is encountered during decompression, calls #onErrorDecompressing | ||
@see stream.Transform#_transform | ||
*/ | ||
Decompressor.prototype._transform = function(data, encoding, done) { | ||
@@ -49,3 +79,3 @@ var _this = this; | ||
if (error) { | ||
return _this.onErrorDecompressing(_this, message, 'Error unzipping', error, asyncReady); | ||
return _this.onErrorDecompressing.apply(_this, [message, 'Error unzipping', error, asyncReady]); | ||
} | ||
@@ -67,5 +97,5 @@ batched = (function() { | ||
case 2: | ||
return _this.onErrorDecompressing(_this, message, 'Snappy not implemented', null, asyncReady); | ||
return _this.onErrorDecompressing.apply(_this, [message, 'Snappy not implemented', null, asyncReady]); | ||
default: | ||
return _this.onErrorDecompressing(_this, message, 'Unknown compression: ' + message.compression, null, asyncReady); | ||
return _this.onErrorDecompressing.apply(_this, [message, 'Unknown compression: ' + message.compression, null, asyncReady]); | ||
} | ||
@@ -76,3 +106,3 @@ }, function(error, results) { | ||
} | ||
data.messages = _.flatten(results); | ||
data.messages = results; | ||
_this.push(data); | ||
@@ -79,0 +109,0 @@ return done(); |
@@ -9,5 +9,7 @@ // This file has been generated from coffee source files | ||
Message = require('prozess').Message; | ||
_ = require('underscore'); | ||
Message = require('prozess').Message; | ||
exports.Message = Message; | ||
@@ -26,2 +28,12 @@ ERR_Unknown = new Error("Unknown"); | ||
/* | ||
Fakes a set of kafka brokers | ||
Queues are memory based and implemented using Arrays. | ||
Writes pushes the provided messages into the queue. | ||
Offsets are related to the queue offset in the array (any calculation using the | ||
offset and message length will fail) | ||
*/ | ||
exports.Brokers = Brokers = (function() { | ||
@@ -34,3 +46,14 @@ var Broker, brokers; | ||
/* | ||
Fake broker, holds the queues for a broker. | ||
Topics are keys of the queue | ||
*/ | ||
Broker = (function() { | ||
/* | ||
Constructs a broker | ||
*/ | ||
function Broker() { | ||
@@ -40,2 +63,7 @@ this.queues = {}; | ||
/* | ||
Adds messages for topic-partition to the queue | ||
*/ | ||
Broker.prototype.write = function(topic, partition, messages) { | ||
@@ -55,2 +83,10 @@ var key, message, _i, _len, _results; | ||
/* | ||
Retrieves messages from the queue. | ||
Provides error on offset out of range | ||
Uses maxMessageSize to batch messages | ||
*/ | ||
Broker.prototype.read = function(topic, partition, offset, maxMessageSize, onData) { | ||
@@ -76,2 +112,7 @@ var item, key, length, | ||
/* | ||
Gets the latests offset | ||
*/ | ||
Broker.prototype.getLatestOffset = function(topic, partition) { | ||
@@ -87,2 +128,9 @@ var key; | ||
/* | ||
Connects to a fake broker. | ||
Will create the broker if not exists, otherwise will use the existing broker | ||
*/ | ||
Brokers.connect = function(host, port) { | ||
@@ -97,2 +145,7 @@ var broker; | ||
/* | ||
Returns all connected brokers | ||
*/ | ||
Brokers.getBrokers = function() { | ||
@@ -102,2 +155,9 @@ return brokers; | ||
/* | ||
Removes all brokers and their queues. | ||
For testing purposes | ||
*/ | ||
Brokers.reset = function() { | ||
@@ -111,5 +171,17 @@ return brokers = {}; | ||
/* | ||
Fake producer, interacts with the fake broker(s) | ||
API compatible with Prozess module | ||
*/ | ||
exports.Producer = Producer = (function(_super) { | ||
__extends(Producer, _super); | ||
/* | ||
Construct producer | ||
*/ | ||
function Producer(topic, options) { | ||
@@ -125,2 +197,7 @@ options = options || {}; | ||
/* | ||
Connects to a broker | ||
*/ | ||
Producer.prototype.connect = function() { | ||
@@ -131,2 +208,7 @@ this.broker = Brokers.connect(this.host, this.port); | ||
/* | ||
Send (produce) a message to the broker | ||
*/ | ||
Producer.prototype.send = function(message, options, cb) { | ||
@@ -148,3 +230,14 @@ var messages; | ||
/* | ||
Fake consumer, interacts with the fake broker(s) | ||
API compatible with zookeeper module | ||
*/ | ||
exports.Consumer = Consumer = (function() { | ||
/* | ||
Constructs the consumer | ||
*/ | ||
function Consumer(options) { | ||
@@ -161,2 +254,7 @@ options = options || {}; | ||
/* | ||
Connects to the fake broker | ||
*/ | ||
Consumer.prototype.connect = function() { | ||
@@ -166,2 +264,7 @@ return this.broker = Brokers.connect(this.host, this.port); | ||
/* | ||
Consume (read) messages from the broker | ||
*/ | ||
Consumer.prototype.consume = function(cb) { | ||
@@ -177,2 +280,7 @@ return this.broker.read(this.topic, this.partition, this.offset, this.maxMessageSize, function(err, messages) { | ||
/* | ||
Returns latests offset | ||
*/ | ||
Consumer.prototype.getLatestOffset = function(cb) { | ||
@@ -186,2 +294,7 @@ return cb(null, this.brokers.getLatestOffset(this.topic, this.partition)); | ||
/* | ||
Utility function to turn messages into an array | ||
*/ | ||
toArray = function(arg) { | ||
@@ -194,2 +307,7 @@ if (_.isArray(arg)) { | ||
/* | ||
Utility function to create real Kafka Message objects | ||
*/ | ||
toListOfMessages = function(args) { | ||
@@ -196,0 +314,0 @@ return _.map(args, function(arg) { |
// This file has been generated from coffee source files | ||
var EventEmitter, Kafkazoo, TopicConsumer, ZooKafka, zookeeper, _, | ||
var Connections, EventEmitter, Kafkazoo, zookeeper, _, | ||
__hasProp = {}.hasOwnProperty, | ||
@@ -13,9 +13,28 @@ __extends = function(child, parent) { for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; }; | ||
TopicConsumer = require('./TopicConsumer'); | ||
Connections = require('./Connections'); | ||
ZooKafka = require('./ZooKafka'); | ||
/* | ||
Higher level client for kafka, that interacts with zookeeper. | ||
*/ | ||
module.exports = Kafkazoo = (function(_super) { | ||
__extends(Kafkazoo, _super); | ||
/* | ||
Constructs client. | ||
@event connected If successfull connected (see #connect) | ||
@event error If error | ||
@event error.message User friendly (but generic) message | ||
@event error.detail Developer friendly (original) details | ||
@param {Object} [options="{zookeeper: {connect: 'localhost:2181', root:'/'}}"] | ||
@param {Object} options.zookeeper | ||
@param {String} options.zookeeper.connect Connect string for zookeeper hosts. Comma-delimited | ||
@param {String} options.zookeeper.root Zookeeper root path | ||
@param {Object} options.zookeeper.clientConfig (optional) Full PlusClient options | ||
*/ | ||
function Kafkazoo(options) { | ||
@@ -36,7 +55,15 @@ options = _.defaults(options || {}, { | ||
this.config = _.omit(options, zookeeper); | ||
this.connections = function() {}; | ||
this.connections.zooKafka = new ZooKafka(this._zookeeper); | ||
this.connections.topicConsumer = {}; | ||
this.connections = new Connections(this._zookeeper); | ||
} | ||
/* | ||
Connect to zookeeper. | ||
Kafka connections are only made with #createProducer and #createConsumer. | ||
@event If connected | ||
@event If error | ||
*/ | ||
Kafkazoo.prototype.connect = function() { | ||
@@ -52,2 +79,9 @@ var _this = this; | ||
/* | ||
Utility method to signal failing of client | ||
@event error | ||
*/ | ||
Kafkazoo.prototype.fatal = function(message, detail) { | ||
@@ -57,6 +91,31 @@ return this.emit('error', message, detail); | ||
/* | ||
Creates a consumer for given topic | ||
@return {Object} {@link TopicConsumer} | ||
*/ | ||
Kafkazoo.prototype.createConsumer = function(topic, consumerGroup, options) { | ||
return this.connections.topicConsumer["" + consumerGroup + "-" + topic] = new TopicConsumer(this.connections, consumerGroup, topic, options); | ||
return this.connections.newTopicConsumer(consumerGroup, topic, options); | ||
}; | ||
/* | ||
Creates a producer for given topic | ||
@return {Object{ {@link TopicProducer} | ||
*/ | ||
Kafkazoo.prototype.createProducer = function(topic, options) { | ||
return false; | ||
}; | ||
/* | ||
Returns all registered (active) brokers, see ZooKafka#getAllRegisteredBrokers | ||
@param {Function} onData Callback, | ||
*/ | ||
Kafkazoo.prototype.getAllRegisteredBrokers = function(onData) { | ||
@@ -63,0 +122,0 @@ return this.connections.zooKafka.getAllRegisteredBrokers(onData); |
@@ -15,2 +15,7 @@ // This file has been generated from coffee source files | ||
/* | ||
Utility function to create properties for a class | ||
*/ | ||
Function.prototype.property = function(prop, desc) { | ||
@@ -20,5 +25,15 @@ return Object.defineProperty(this.prototype, prop, desc); | ||
/* | ||
Consumer for a specific topic partition. Wraps the underlying Prozess client. | ||
*/ | ||
module.exports = PartitionConsumer = (function(_super) { | ||
__extends(PartitionConsumer, _super); | ||
/* | ||
Current offset, as reported by Prozess client | ||
*/ | ||
PartitionConsumer.property('offset', { | ||
@@ -34,2 +49,7 @@ get: function() { | ||
/* | ||
Constructs the partition consumer | ||
*/ | ||
function PartitionConsumer(topicConsumer, topicPartition, options) { | ||
@@ -58,23 +78,35 @@ PartitionConsumer.__super__.constructor.call(this, { | ||
/* | ||
Sets up default event handlers (part of construction). | ||
*/ | ||
PartitionConsumer.prototype._defineHandlers = function(options) { | ||
this.onNoMessages = options.onNoMessages || function(this_) { | ||
this.onNoMessages = options.onNoMessages || function() { | ||
var _this = this; | ||
return setTimeout(function() { | ||
return this_.consumeNext(); | ||
return _this.consumeNext(); | ||
}, options.noMessagesTimeout || 2000); | ||
}; | ||
this.onOffsetOutOfRange = options.onOffsetOutOfRange || function(this_) { | ||
return this_.consumer.getLatestOffset(function(error, offset) { | ||
this.onOffsetOutOfRange = options.onOffsetOutOfRange || function() { | ||
var _this = this; | ||
return this.consumer.getLatestOffset(function(error, offset) { | ||
if (error) { | ||
return this_.fatal('retrieving latest offset', error); | ||
return _this.fatal('retrieving latest offset', error); | ||
} | ||
return this_._registerConsumerOffset(offset, function() { | ||
return this_.consumeNext(); | ||
return _this._registerConsumerOffset(offset, function() { | ||
return _this.consumeNext(); | ||
}); | ||
}); | ||
}; | ||
return this.onConsumptionError = options.onConsumptionError || function(this_, error) { | ||
return this_.fatal('on consumption', error); | ||
return this.onConsumptionError = options.onConsumptionError || function(error) { | ||
return this.fatal('on consumption', error); | ||
}; | ||
}; | ||
/* | ||
Connects to Kafka | ||
*/ | ||
PartitionConsumer.prototype.connect = function() { | ||
@@ -108,2 +140,7 @@ var _this = this; | ||
/* | ||
Disconnects from Kafka | ||
*/ | ||
PartitionConsumer.prototype.disconnect = function() { | ||
@@ -113,2 +150,7 @@ return this.consumer = null; | ||
/* | ||
Shut down consumer | ||
*/ | ||
PartitionConsumer.prototype.shutdown = function() { | ||
@@ -119,2 +161,7 @@ this.disconnect(); | ||
/* | ||
Utility function | ||
*/ | ||
PartitionConsumer.prototype._getPartitionConnectionAndOffsetDetails = function(onData) { | ||
@@ -124,2 +171,7 @@ return this.zooKafka.getPartitionConnectionAndOffsetDetails(this.consumerGroup, this.topicPartition, onData); | ||
/* | ||
Utility function | ||
*/ | ||
PartitionConsumer.prototype._registerConsumerOffset = function(offset, onReady) { | ||
@@ -130,4 +182,16 @@ this.zooKafka.registerConsumerOffset(this.consumerGroup, this.topicPartition, offset, onReady); | ||
/* | ||
Dummy implementation of stream.Readable#_read | ||
*/ | ||
PartitionConsumer.prototype._read = function() {}; | ||
/* | ||
Consume from Kafka. | ||
Kafka 0.7 is currently polling only. | ||
*/ | ||
PartitionConsumer.prototype.consumeNext = function() { | ||
@@ -148,9 +212,9 @@ var _this = this; | ||
if (error.message !== 'OffsetOutOfRange') { | ||
return _this.onConsumptionError(_this, error); | ||
return _this.onConsumptionError.apply(_this, [error]); | ||
} | ||
_this.emit('offsetOutOfRange', _this.partitionDetails.offset); | ||
return _this.onOffsetOutOfRange(_this); | ||
return _this.onOffsetOutOfRange.apply(_this); | ||
} | ||
if (messages.length === 0) { | ||
return _this.onNoMessages(_this); | ||
return _this.onNoMessages.apply(_this); | ||
} | ||
@@ -164,2 +228,7 @@ return _this.push({ | ||
/* | ||
Utility method | ||
*/ | ||
PartitionConsumer.prototype.fatal = function(msg, detail) { | ||
@@ -166,0 +235,0 @@ return this.emit('error', msg, detail); |
// This file has been generated from coffee source files | ||
var Compression, PartitionConsumer, Readable, StandaloneStrategy, TopicConsumer, async, util, uuid, _, | ||
var Compression, PartitionConsumer, Readable, TopicConsumer, async, rebalanceStrategy, util, uuid, _, | ||
__hasProp = {}.hasOwnProperty, | ||
@@ -21,10 +21,33 @@ __extends = function(child, parent) { for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; }; | ||
StandaloneStrategy = require('./rebalanceStrategy/Standalone'); | ||
rebalanceStrategy = require('./rebalanceStrategy'); | ||
/* | ||
Coordinates the consumption of the various partitions and brokers | ||
where topic messages are stored. | ||
Creation is done by {@link Kafakazoo), which provides a reference to the | ||
created topic consumer. | ||
Information about this is retrieved from zookeeper, which partitions and | ||
brokers to consume is further determined by the rebalance strategy. | ||
*/ | ||
module.exports = TopicConsumer = (function(_super) { | ||
__extends(TopicConsumer, _super); | ||
/* | ||
Constructs a topic consumer. | ||
@param {Object} connections Kafkazoo connections | ||
@param {String} consumerGroup Kafka consumer group | ||
@param {String} consumerGroup Kafka topic | ||
@param {Object} options (optional) | ||
@param {Object} options.rebalanceStrategy (optional) Rebalance strategy class | ||
(defaults to StandaloneStrategy) | ||
*/ | ||
function TopicConsumer(connections, consumerGroup, topic, options) { | ||
var rebalanceStrategy, | ||
_this = this; | ||
var _this = this; | ||
TopicConsumer.__super__.constructor.call(this, { | ||
@@ -38,5 +61,4 @@ objectMode: true | ||
this.consumerId = options.consumerId || uuid.v1(); | ||
rebalanceStrategy = options.rebalanceStrategy || StandaloneStrategy; | ||
this.rebalancer = new rebalanceStrategy(this.connections, this.consumerGroup, this.topic, this.consumerId); | ||
this.rebalancer.on('partitions', this.rebalance); | ||
this.rebalanceStrategy = options.rebalanceStrategy || rebalanceStrategy.standAlone; | ||
this.on('partitions', this.rebalance); | ||
this.partitionConsumers = {}; | ||
@@ -55,8 +77,33 @@ this.partitionConsumerConfig = {}; | ||
/* | ||
Connects to zookeeper and kafka. | ||
Delegates to the rebalance strategy to deliver the partitions to read from. | ||
This is done via the ```partitions``` event, that fires #rebalance | ||
*/ | ||
TopicConsumer.prototype.connect = function() { | ||
return this.rebalancer.connect(); | ||
return this.rebalanceStrategy.apply(this); | ||
}; | ||
/* | ||
Dummy implementation of stream.Readable#_read | ||
No automatic reading | ||
*/ | ||
TopicConsumer.prototype._read = function() {}; | ||
/* | ||
Connecting and disconnection the low level partition consumers, as provided by the | ||
rebalance strategy, | ||
Currently only does initial connection | ||
TODO: implement rebalancing | ||
*/ | ||
TopicConsumer.prototype.rebalance = function(partitions) { | ||
@@ -73,2 +120,9 @@ var _this = this; | ||
/* | ||
Construction of a partition consumer. | ||
Wires the various events | ||
*/ | ||
TopicConsumer.prototype.connectPartitionConsumer = function(partition) { | ||
@@ -75,0 +129,0 @@ var event, id, partitionConsumer, _fn, _i, _len, _ref, |
// This file has been generated from coffee source files | ||
var ZooKafka, async, _; | ||
var ZooKafka, async, _, | ||
__indexOf = [].indexOf || function(item) { for (var i = 0, l = this.length; i < l; i++) { if (i in this && this[i] === item) return i; } return -1; }; | ||
@@ -9,3 +10,102 @@ async = require('async'); | ||
/* | ||
Navigating the zookeeper registry for Kafka, to be compliant with the official Kafka client | ||
v0.7 zookeeper structure, based on | ||
- [Kafka wiki](https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka), Sep 2013 | ||
- [Kafka source](https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala) | ||
General: | ||
- brokerId : number, configured on broker | ||
eg: 2 | ||
- creator : assigned name, <ip>-<epoch stamp> | ||
eg: 10.0.0.12-1324306324402 | ||
- groupId : alphanumeric, app | ||
eg: cheeseLovers | ||
- topic : alphanumeric | ||
eg: cheese | ||
- consumerId : alphanumeric, configured on consumer | ||
eg: mouse-1 | ||
- partitionId: number | ||
eg: 4 | ||
Broker registration: | ||
- path: /brokers/ids/[brokerId] | ||
- value: [creator]:[host]:[port] | ||
- eg: /brokers/ids/2 => 10.0.0.12-1324306324402:10.0.0.12:9092 | ||
Broker topic registration: | ||
- path: /brokers/topics/[topic]/[brokerId] | ||
- value: [numberOfPartitionsOnBroker] | ||
- eg: /brokers/topics/cheese/2 => 4 | ||
Consumer registration: | ||
- path: /consumers/[groupId]/ids/[consumerId] | ||
- eg: /consumers/cheeseLovers/ids/mouse-1 | ||
Consumer topic registration: | ||
- path: /consumers/[groupId]/ids/[consumerId]/[topic] | ||
- eg: /consumers/cheeseLovers/ids/mouse-1/cheese | ||
Consumer owner tracking: | ||
- path: /consumers/[groupId]/owner/[topic]/[brokerId]-[partitionId] | ||
- value: [consumerId] | ||
- eg: /consumers/cheeseLovers/owner/cheese/2-4 => mouse-1 | ||
- changes on re-balancing | ||
Consumer offset tracking: | ||
- path: /consumers/[groupId]/offsets/[topic]/[broker_id-partition_id] | ||
- value: [offsetCounterValue] | ||
- eg: /consumers/cheeseLovers/offsets/cheese/2-4 => 1024 | ||
Producers (existing topic): | ||
1. Producer created per topic | ||
2. Read /brokers/ids/[brokerId], map [brokerId] -> Kafka connection | ||
3. Read /brokers/topics/[topic]/[brokerId], map [brokerId-PartitionId] -> broker connection | ||
4. On send: pick brokerId-PartitionId | ||
Watch: | ||
- add/delete child: /brokers/ids | ||
- change value: /brokers/topics/[topic]/[brokerId] | ||
New topic on broker: | ||
- /brokers/topics/[topic]/[brokerId] does not exist yet | ||
- send to partition 0 | ||
- /brokers/topics/[topic]/[brokerId] will be updated | ||
Consumers: | ||
not always registered in zookeeper, eg when directly using Prozess | ||
normal java client behaviour, which we should follow: | ||
1. register | ||
2. trigger re-balance? | ||
Expired session: | ||
- release ownership, re-register,re-balance | ||
Re-balancing, as done by Java client | ||
- All Consumers in a ConsumerGroup will come to a consensus as to who is consuming what. | ||
- Each Broker+Topic+Partition combination is consumed by one and only one Consumer | ||
even if it means that some Consumers don't get anything at all. | ||
- A Consumer should try to have as many partitions on the same Broker as possible, | ||
sort the list by [Broker ID]-[Partition] (0-0, 0-1, 0-2, etc.), and assign them in chunks. | ||
- Consumers are sorted by their Consumer IDs. | ||
If there are three Consumers, two Brokers, and three partitions in each, the split might look like: | ||
Consumer A: [0-0, 0-1] | ||
Consumer B: [0-2, 1-0] | ||
Consumer C: [1-1, 1-2] | ||
- If the distribution can't be even and some Consumers must have more partitions than others, | ||
the extra partitions always go to the earlier consumers on the list. | ||
So you could have a distribution like 4-4-4-4 or 5-5-4-4, but never 4-4-4-5 or 4-5-4-4. | ||
TODO: cache data, watch for changes | ||
*/ | ||
module.exports = ZooKafka = (function() { | ||
/* | ||
Constructs the object, done by {@link Kafkazoo} | ||
@param {Object} client Zookeeper client | ||
@praram [options="{}"] | ||
*/ | ||
function ZooKafka(client, options) { | ||
@@ -16,2 +116,16 @@ this.client = client; | ||
/* | ||
Returns all registered (active) brokers with their connection details | ||
@param {Function} onData Callback | ||
@param onData.error | ||
@param {Object} onData.brokers | ||
@param onData.brokers.[brokerId] | ||
@param onData.brokers.[brokerId].name | ||
@param onData.brokers.[brokerId].host | ||
@param onData.brokers.[brokerId].port | ||
@param onData.brokers.[brokerId].id | ||
*/ | ||
ZooKafka.prototype.getAllRegisteredBrokers = function(onData) { | ||
@@ -42,2 +156,16 @@ var brokerMap, zkPath, | ||
/* | ||
Returns connection details for a given broker | ||
@param {String} brokerId | ||
@param {Function} onData Callback | ||
@param onData.error | ||
@param {Object} onData.broker | ||
@param onData.broker.name | ||
@param onData.broker.host | ||
@param onData.broker.port | ||
@param onData.broker.id | ||
*/ | ||
ZooKafka.prototype.getRegisteredBroker = function(brokerId, onData) { | ||
@@ -60,2 +188,11 @@ var zkPath, | ||
/* | ||
Returns all registered topics (names only) | ||
@param {Function} onData Callback | ||
@param onData.error | ||
@param {String[]} onData.topics List of topic names | ||
*/ | ||
ZooKafka.prototype.getAllRegisteredTopics = function(onData) { | ||
@@ -76,2 +213,61 @@ var zkPath, | ||
/* | ||
Get all registered (active) brokers for topic | ||
*/ | ||
ZooKafka.prototype.getRegisteredTopicBrokers = function(topic, options, onData) { | ||
var partitionMap, zkPath, | ||
_this = this; | ||
if (!onData && options && _.isFunction(options)) { | ||
onData = options; | ||
options = {}; | ||
} | ||
options = _.defaults(options || {}); | ||
zkPath = ['/brokers/topics', topic]; | ||
partitionMap = {}; | ||
return async.parallel({ | ||
activeBrokers: function(asyncReady) { | ||
return _this.getAllRegisteredBrokers(asyncReady); | ||
}, | ||
topicBrokers: function(asyncReady) { | ||
return _this.client.getChildren(zkPath, function(error, brokers) { | ||
if (error) { | ||
return onData({ | ||
msg: 'Error retrieving topic partitions', | ||
error: error | ||
}); | ||
} | ||
return asyncReady(null, brokers); | ||
}); | ||
} | ||
}, function(error, result) { | ||
var brokers; | ||
if (error) { | ||
return onData(error); | ||
} | ||
brokers = result.activeBrokers.filter(function(broker) { | ||
return __indexOf.call(result.topicBrokers, broker) >= 0; | ||
}); | ||
return onData(null, brokers); | ||
}); | ||
}; | ||
/* | ||
Returns all registered topic partitions, by default only for registered brokers | ||
@param {String} topic | ||
@param {Object} [options="{onlyRegisteredBrokers: true}"] | ||
@param {Boolean} options.onlyRegisteredBrokers | ||
@param {Function} onData Callback | ||
@param onData.error | ||
@param {Object} onData.topicPartitions | ||
@param onData.topicPartitions.[brokerId-ParttionId] | ||
@param onData.topicPartitions.[brokerId-ParttionId].topic | ||
@param onData.topicPartitions.[brokerId-ParttionId].brokerPartitionId | ||
@param onData.topicPartitions.[brokerId-ParttionId].brokerId | ||
@param onData.topicPartitions.[brokerId-ParttionId].PartitionId | ||
*/ | ||
ZooKafka.prototype.getRegisteredTopicPartitions = function(topic, options, onData) { | ||
@@ -136,2 +332,20 @@ var partitionMap, zkPath, | ||
/* | ||
Returns registered consumer offset for given topicPartition and consumerGroup | ||
If no offset is registered, 0 is returned. | ||
@param {String} consumerGroup | ||
@param {Object} topicPartition Normally use the output from #getRegisteredTopicPartitions | ||
@param {String} topicPartition.topic | ||
@param {String} topicPartition.brokerPartitionId | ||
@param {Function} onData Callback | ||
@param onData.error | ||
@param {Object} onData.registeredOffset | ||
@param {String} onData.registeredOffset.consumerGroup As provided | ||
@param {Object} onData.registeredOffset.topicPartition As provided | ||
@param {String} onData.registeredOffset.offset | ||
*/ | ||
ZooKafka.prototype.getRegisteredConsumerOffset = function(consumerGroup, topicPartition, onData) { | ||
@@ -156,2 +370,22 @@ var zkPath; | ||
/* | ||
Returns registered consumer offset for given topicPartition and consumerGroup, and the | ||
broker connection details for the topicPartition | ||
If no offset is registered, 0 is returned. | ||
@param {String} consumerGroup | ||
@param {Object} topicPartition Normally use the output from #getRegisteredTopicPartitions | ||
@param {String} topicPartition.topic | ||
@param {String} topicPartition.brokerPartitionId | ||
@param {String} topicPartition.brokerId | ||
@param {Function} onData Callback | ||
@param onData.error | ||
@param {Object} onData.details | ||
@param {Object} onData.details.topicPartition As provided | ||
@param {Object} onData.details.broker From #getRegisteredBroker | ||
@param {Object} onData.details.consumerOffset From #getRegisteredConsumerOffset | ||
*/ | ||
ZooKafka.prototype.getPartitionConnectionAndOffsetDetails = function(consumerGroup, topicPartition, onData) { | ||
@@ -176,2 +410,15 @@ var _this = this; | ||
/* | ||
Registers the provided offset for the given consumerGroup and topicPartition | ||
@param {String} consumerGroup | ||
@param {Object} topicPartition Normally use the output from #getRegisteredTopicPartitions | ||
@param {String} topicPartition.topic | ||
@param {String} topicPartition.brokerPartitionId | ||
@param {String} offset The new offset value | ||
@param {Function} onReady Callback | ||
@param onReady.error | ||
*/ | ||
ZooKafka.prototype.registerConsumerOffset = function(consumerGroup, topicPartition, offset, onReady) { | ||
@@ -178,0 +425,0 @@ var zkPath, |
@@ -53,3 +53,3 @@ // This file has been generated from coffee source files | ||
}; | ||
return decompressor.onErrorDecompressing(decompressor, {}, 'foo error', providedDetails, function(err) { | ||
return decompressor.onErrorDecompressing(decompressor, 'foo error', providedDetails, function(err) { | ||
err.msg.should.equal('foo error'); | ||
@@ -90,3 +90,3 @@ err.detail.should.equal(providedDetails); | ||
result.should.eql({ | ||
messages: [original1, original2, original1, original2], | ||
messages: [[original1, original2], [original1, original2]], | ||
offset: '123' | ||
@@ -136,3 +136,3 @@ }); | ||
result.should.eql({ | ||
messages: [payload1, original1, original2, payload2], | ||
messages: [payload1, [original1, original2], payload2], | ||
offset: '123' | ||
@@ -139,0 +139,0 @@ }); |
@@ -7,3 +7,3 @@ // This file has been generated from coffee source files | ||
FakeProzess = require('../../src/index').FakeProzess; | ||
FakeProzess = require('../../src/lib/FakeProzess'); | ||
@@ -10,0 +10,0 @@ describe('FakeProzess', function() { |
// This file has been generated from coffee source files | ||
var TopicConsumer, ZooKafka, mockery, should, sinon, _; | ||
var TopicConsumer, TopicProducer, ZooKafka, mockery, should, sinon, _; | ||
@@ -15,2 +15,4 @@ mockery = require('mockery'); | ||
TopicProducer = require('../../src/lib/TopicProducer'); | ||
ZooKafka = require('../../src/lib/ZooKafka'); | ||
@@ -55,3 +57,6 @@ | ||
})); | ||
mockery.registerAllowables(['../../src/lib/Kafkazoo', 'events', 'util', 'underscore']); | ||
mockery.registerMock('./TopicProducer', stubbedClass('topicProducer', function(stub) { | ||
return stub.connections = stub._initArgs[0]; | ||
})); | ||
mockery.registerAllowables(['../../src/lib/Kafkazoo', './Connections', 'events', 'util', 'underscore']); | ||
return Kafkazoo = require('../../src/lib/Kafkazoo'); | ||
@@ -74,2 +79,3 @@ }); | ||
stubbed['topicConsumer'] = sinon.stub(new TopicConsumer(kafkaConnectionsStub)); | ||
stubbed['topicProducer'] = sinon.stub(new TopicProducer(kafkaConnectionsStub)); | ||
return kafka = new Kafkazoo(); | ||
@@ -166,3 +172,3 @@ }); | ||
}); | ||
return describe('createConsumer', function() { | ||
describe('createConsumer', function() { | ||
it('should create a topic consumer with given group and topic', function() { | ||
@@ -180,2 +186,7 @@ kafka.createConsumer('topic', 'group'); | ||
}); | ||
return describe('createProducer', function() { | ||
return it('should create a producer wfor a given topic', function() { | ||
return kafka.createProducer('topic'); | ||
}); | ||
}); | ||
}); | ||
@@ -182,0 +193,0 @@ |
// This file has been generated from coffee source files | ||
var concat, mockery, should, sinon, stream, util, _, | ||
var FakeProzess, concat, mockery, should, sinon, stream, util, _, | ||
__indexOf = [].indexOf || function(item) { for (var i = 0, l = this.length; i < l; i++) { if (i in this && this[i] === item) return i; } return -1; }; | ||
@@ -20,2 +20,4 @@ | ||
FakeProzess = require('../../src/lib/FakeProzess'); | ||
describe('PartitionConsumer class', function() { | ||
@@ -281,3 +283,3 @@ var DEFAULT_NOMSG_TIMEOUT, PartitionConsumer, ProzessConsumerStub, ProzessStub, TopicConsumerStub, ZookeeperClientStub, createTestPartitionConsumer, fakeClock, given, partitionConsumer; | ||
}); | ||
return partitionConsumer.onConsumptionError(partitionConsumer, 'foo error'); | ||
return partitionConsumer.onConsumptionError('foo error'); | ||
}); | ||
@@ -540,3 +542,3 @@ }); | ||
delegated.calledOnce.should.be["true"]; | ||
return delegated.calledWith(partitionConsumer).should.be["true"]; | ||
return delegated.calledWith().should.be["true"]; | ||
}); | ||
@@ -568,3 +570,3 @@ it('should try another consumeNext by default', function() { | ||
delegated.calledOnce.should.be["true"]; | ||
return delegated.calledWith(partitionConsumer, 'foo!').should.be["true"]; | ||
return delegated.calledWith('foo!').should.be["true"]; | ||
}); | ||
@@ -621,4 +623,3 @@ it('should emit error event by default', function() { | ||
partitionConsumer.consumeNext(); | ||
delegated.calledOnce.should.be["true"]; | ||
return delegated.calledWith(partitionConsumer).should.be["true"]; | ||
return delegated.calledOnce.should.be["true"]; | ||
}); | ||
@@ -625,0 +626,0 @@ it('should retry consume by default (after a timeout)', function() { |
@@ -29,12 +29,5 @@ // This file has been generated from coffee source files | ||
before(function() { | ||
StrategyStub = (function() { | ||
function StrategyStub() {} | ||
StrategyStub.prototype.on = function() {}; | ||
StrategyStub.prototype.connect = function() {}; | ||
return StrategyStub; | ||
})(); | ||
StrategyStub = { | ||
standAlone: function() {} | ||
}; | ||
PartitionConsumerStub = (function(_super) { | ||
@@ -69,3 +62,3 @@ __extends(PartitionConsumerStub, _super); | ||
mockery.registerMock('./PartitionConsumer', PartitionConsumerStub); | ||
mockery.registerMock('./rebalanceStrategy/Standalone', StrategyStub); | ||
mockery.registerMock('./rebalanceStrategy', StrategyStub); | ||
mockery.registerAllowables(['stream', 'util', 'async', 'underscore', 'zlib', 'crypto', './Compression', '../../src/lib/TopicConsumer']); | ||
@@ -123,19 +116,14 @@ return TopicConsumer = require('../../src/lib/TopicConsumer'); | ||
topicConsumer.partitionConsumers.should.eql({}); | ||
return topicConsumer.rebalancer.should.be.instanceOf(StrategyStub); | ||
return topicConsumer.rebalanceStrategy.should.be.equal(StrategyStub.standAlone); | ||
}); | ||
it('can be given a strategy class', function() { | ||
var StrategyStub2; | ||
StrategyStub2 = (function() { | ||
function StrategyStub2() {} | ||
StrategyStub2.prototype.on = function() {}; | ||
return StrategyStub2; | ||
})(); | ||
it('can be given a rebalancer strategy', function() { | ||
var strategy; | ||
strategy = function() { | ||
return this.emit('error', 'not called'); | ||
}; | ||
topicConsumer = new TopicConsumer(zkClientStub, 'groupA', 'foo', { | ||
rebalanceStrategy: StrategyStub2 | ||
rebalanceStrategy: strategy | ||
}); | ||
topicConsumer.rebalancer.should.not.be.instanceOf(StrategyStub); | ||
return topicConsumer.rebalancer.should.be.instanceOf(StrategyStub2); | ||
topicConsumer.rebalanceStrategy.should.not.be.equal(StrategyStub.standAlone); | ||
return topicConsumer.rebalanceStrategy.should.be.equal(strategy); | ||
}); | ||
@@ -160,6 +148,6 @@ return it('should stream data via preprocess', function(done) { | ||
describe('#connect', function() { | ||
return it('should connect the used rebalance strategy', function() { | ||
sinon.spy(topicConsumer.rebalancer, 'connect'); | ||
return it('should call the used rebalance strategy', function() { | ||
sinon.stub(topicConsumer, 'rebalanceStrategy'); | ||
topicConsumer.connect(); | ||
return topicConsumer.rebalancer.connect.calledOnce.should.be["true"]; | ||
return topicConsumer.rebalanceStrategy.calledOnce.should.be["true"]; | ||
}); | ||
@@ -166,0 +154,0 @@ }); |
@@ -11,3 +11,3 @@ { | ||
], | ||
"version": "0.1.1", | ||
"version": "0.2.0", | ||
"repository": { | ||
@@ -14,0 +14,0 @@ "type": "git", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
277834
22.29%85
26.87%3088
28.56%