New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More

kafkazoo

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkazoo - npm Package Compare versions

Comparing version

to
0.2.0

@@ -5,6 +5,4 @@ // This file has been generated from coffee source files

exports.FakeProzess = require('./lib/FakeProzess');
/*
//@ sourceMappingURL=index.js.map
*/

@@ -9,4 +9,2 @@ // This file has been generated from coffee source files

async = require('async');
zlib = require('zlib');

@@ -16,7 +14,30 @@

async = require('async');
_ = require('underscore');
/*
Decompressing messages received from Kafka
Transformation stream
TODO: implement snappy
*/
exports.Decompressor = Decompressor = (function(_super) {
__extends(Decompressor, _super);
/*
Constructs the decompressor.
onErrorDecompressing: function(message, error, detail, next) {
next({msg: error, detail: detail});
};
@param {Object} options (optional)
@param {Function} options.onErrorDecompressing (optional)
*/
function Decompressor(options) {

@@ -26,3 +47,3 @@ options = options || {};

Decompressor.__super__.constructor.call(this, options);
this.onErrorDecompressing = options.onErrorDecompressing || function(this_, message, error, detail, next) {
this.onErrorDecompressing = options.onErrorDecompressing || function(message, error, detail, next) {
return next({

@@ -35,2 +56,11 @@ msg: error,

/*
Transformation
If an error is encountered during decompression, calls #onErrorDecompressing
@see stream.Transform#_transform
*/
Decompressor.prototype._transform = function(data, encoding, done) {

@@ -49,3 +79,3 @@ var _this = this;

if (error) {
return _this.onErrorDecompressing(_this, message, 'Error unzipping', error, asyncReady);
return _this.onErrorDecompressing.apply(_this, [message, 'Error unzipping', error, asyncReady]);
}

@@ -67,5 +97,5 @@ batched = (function() {

case 2:
return _this.onErrorDecompressing(_this, message, 'Snappy not implemented', null, asyncReady);
return _this.onErrorDecompressing.apply(_this, [message, 'Snappy not implemented', null, asyncReady]);
default:
return _this.onErrorDecompressing(_this, message, 'Unknown compression: ' + message.compression, null, asyncReady);
return _this.onErrorDecompressing.apply(_this, [message, 'Unknown compression: ' + message.compression, null, asyncReady]);
}

@@ -76,3 +106,3 @@ }, function(error, results) {

}
data.messages = _.flatten(results);
data.messages = results;
_this.push(data);

@@ -79,0 +109,0 @@ return done();

@@ -9,5 +9,7 @@ // This file has been generated from coffee source files

Message = require('prozess').Message;
_ = require('underscore');
Message = require('prozess').Message;
exports.Message = Message;

@@ -26,2 +28,12 @@ ERR_Unknown = new Error("Unknown");

/*
Fakes a set of kafka brokers
Queues are memory based and implemented using Arrays.
Writes pushes the provided messages into the queue.
Offsets are related to the queue offset in the array (any calculation using the
offset and message length will fail)
*/
exports.Brokers = Brokers = (function() {

@@ -34,3 +46,14 @@ var Broker, brokers;

/*
Fake broker, holds the queues for a broker.
Topics are keys of the queue
*/
Broker = (function() {
/*
Constructs a broker
*/
function Broker() {

@@ -40,2 +63,7 @@ this.queues = {};

/*
Adds messages for topic-partition to the queue
*/
Broker.prototype.write = function(topic, partition, messages) {

@@ -55,2 +83,10 @@ var key, message, _i, _len, _results;

/*
Retrieves messages from the queue.
Provides error on offset out of range
Uses maxMessageSize to batch messages
*/
Broker.prototype.read = function(topic, partition, offset, maxMessageSize, onData) {

@@ -76,2 +112,7 @@ var item, key, length,

/*
Gets the latests offset
*/
Broker.prototype.getLatestOffset = function(topic, partition) {

@@ -87,2 +128,9 @@ var key;

/*
Connects to a fake broker.
Will create the broker if not exists, otherwise will use the existing broker
*/
Brokers.connect = function(host, port) {

@@ -97,2 +145,7 @@ var broker;

/*
Returns all connected brokers
*/
Brokers.getBrokers = function() {

@@ -102,2 +155,9 @@ return brokers;

/*
Removes all brokers and their queues.
For testing purposes
*/
Brokers.reset = function() {

@@ -111,5 +171,17 @@ return brokers = {};

/*
Fake producer, interacts with the fake broker(s)
API compatible with Prozess module
*/
exports.Producer = Producer = (function(_super) {
__extends(Producer, _super);
/*
Construct producer
*/
function Producer(topic, options) {

@@ -125,2 +197,7 @@ options = options || {};

/*
Connects to a broker
*/
Producer.prototype.connect = function() {

@@ -131,2 +208,7 @@ this.broker = Brokers.connect(this.host, this.port);

/*
Send (produce) a message to the broker
*/
Producer.prototype.send = function(message, options, cb) {

@@ -148,3 +230,14 @@ var messages;

/*
Fake consumer, interacts with the fake broker(s)
API compatible with zookeeper module
*/
exports.Consumer = Consumer = (function() {
/*
Constructs the consumer
*/
function Consumer(options) {

@@ -161,2 +254,7 @@ options = options || {};

/*
Connects to the fake broker
*/
Consumer.prototype.connect = function() {

@@ -166,2 +264,7 @@ return this.broker = Brokers.connect(this.host, this.port);

/*
Consume (read) messages from the broker
*/
Consumer.prototype.consume = function(cb) {

@@ -177,2 +280,7 @@ return this.broker.read(this.topic, this.partition, this.offset, this.maxMessageSize, function(err, messages) {

/*
Returns latests offset
*/
Consumer.prototype.getLatestOffset = function(cb) {

@@ -186,2 +294,7 @@ return cb(null, this.brokers.getLatestOffset(this.topic, this.partition));

/*
Utility function to turn messages into an array
*/
toArray = function(arg) {

@@ -194,2 +307,7 @@ if (_.isArray(arg)) {

/*
Utility function to create real Kafka Message objects
*/
toListOfMessages = function(args) {

@@ -196,0 +314,0 @@ return _.map(args, function(arg) {

// This file has been generated from coffee source files
var EventEmitter, Kafkazoo, TopicConsumer, ZooKafka, zookeeper, _,
var Connections, EventEmitter, Kafkazoo, zookeeper, _,
__hasProp = {}.hasOwnProperty,

@@ -13,9 +13,28 @@ __extends = function(child, parent) { for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; };

TopicConsumer = require('./TopicConsumer');
Connections = require('./Connections');
ZooKafka = require('./ZooKafka');
/*
Higher level client for kafka, that interacts with zookeeper.
*/
module.exports = Kafkazoo = (function(_super) {
__extends(Kafkazoo, _super);
/*
Constructs client.
@event connected If successfull connected (see #connect)
@event error If error
@event error.message User friendly (but generic) message
@event error.detail Developer friendly (original) details
@param {Object} [options="{zookeeper: {connect: 'localhost:2181', root:'/'}}"]
@param {Object} options.zookeeper
@param {String} options.zookeeper.connect Connect string for zookeeper hosts. Comma-delimited
@param {String} options.zookeeper.root Zookeeper root path
@param {Object} options.zookeeper.clientConfig (optional) Full PlusClient options
*/
function Kafkazoo(options) {

@@ -36,7 +55,15 @@ options = _.defaults(options || {}, {

this.config = _.omit(options, zookeeper);
this.connections = function() {};
this.connections.zooKafka = new ZooKafka(this._zookeeper);
this.connections.topicConsumer = {};
this.connections = new Connections(this._zookeeper);
}
/*
Connect to zookeeper.
Kafka connections are only made with #createProducer and #createConsumer.
@event If connected
@event If error
*/
Kafkazoo.prototype.connect = function() {

@@ -52,2 +79,9 @@ var _this = this;

/*
Utility method to signal failing of client
@event error
*/
Kafkazoo.prototype.fatal = function(message, detail) {

@@ -57,6 +91,31 @@ return this.emit('error', message, detail);

/*
Creates a consumer for given topic
@return {Object} {@link TopicConsumer}
*/
Kafkazoo.prototype.createConsumer = function(topic, consumerGroup, options) {
return this.connections.topicConsumer["" + consumerGroup + "-" + topic] = new TopicConsumer(this.connections, consumerGroup, topic, options);
return this.connections.newTopicConsumer(consumerGroup, topic, options);
};
/*
Creates a producer for given topic
@return {Object{ {@link TopicProducer}
*/
Kafkazoo.prototype.createProducer = function(topic, options) {
return false;
};
/*
Returns all registered (active) brokers, see ZooKafka#getAllRegisteredBrokers
@param {Function} onData Callback,
*/
Kafkazoo.prototype.getAllRegisteredBrokers = function(onData) {

@@ -63,0 +122,0 @@ return this.connections.zooKafka.getAllRegisteredBrokers(onData);

@@ -15,2 +15,7 @@ // This file has been generated from coffee source files

/*
Utility function to create properties for a class
*/
Function.prototype.property = function(prop, desc) {

@@ -20,5 +25,15 @@ return Object.defineProperty(this.prototype, prop, desc);

/*
Consumer for a specific topic partition. Wraps the underlying Prozess client.
*/
module.exports = PartitionConsumer = (function(_super) {
__extends(PartitionConsumer, _super);
/*
Current offset, as reported by Prozess client
*/
PartitionConsumer.property('offset', {

@@ -34,2 +49,7 @@ get: function() {

/*
Constructs the partition consumer
*/
function PartitionConsumer(topicConsumer, topicPartition, options) {

@@ -58,23 +78,35 @@ PartitionConsumer.__super__.constructor.call(this, {

/*
Sets up default event handlers (part of construction).
*/
PartitionConsumer.prototype._defineHandlers = function(options) {
this.onNoMessages = options.onNoMessages || function(this_) {
this.onNoMessages = options.onNoMessages || function() {
var _this = this;
return setTimeout(function() {
return this_.consumeNext();
return _this.consumeNext();
}, options.noMessagesTimeout || 2000);
};
this.onOffsetOutOfRange = options.onOffsetOutOfRange || function(this_) {
return this_.consumer.getLatestOffset(function(error, offset) {
this.onOffsetOutOfRange = options.onOffsetOutOfRange || function() {
var _this = this;
return this.consumer.getLatestOffset(function(error, offset) {
if (error) {
return this_.fatal('retrieving latest offset', error);
return _this.fatal('retrieving latest offset', error);
}
return this_._registerConsumerOffset(offset, function() {
return this_.consumeNext();
return _this._registerConsumerOffset(offset, function() {
return _this.consumeNext();
});
});
};
return this.onConsumptionError = options.onConsumptionError || function(this_, error) {
return this_.fatal('on consumption', error);
return this.onConsumptionError = options.onConsumptionError || function(error) {
return this.fatal('on consumption', error);
};
};
/*
Connects to Kafka
*/
PartitionConsumer.prototype.connect = function() {

@@ -108,2 +140,7 @@ var _this = this;

/*
Disconnects from Kafka
*/
PartitionConsumer.prototype.disconnect = function() {

@@ -113,2 +150,7 @@ return this.consumer = null;

/*
Shut down consumer
*/
PartitionConsumer.prototype.shutdown = function() {

@@ -119,2 +161,7 @@ this.disconnect();

/*
Utility function
*/
PartitionConsumer.prototype._getPartitionConnectionAndOffsetDetails = function(onData) {

@@ -124,2 +171,7 @@ return this.zooKafka.getPartitionConnectionAndOffsetDetails(this.consumerGroup, this.topicPartition, onData);

/*
Utility function
*/
PartitionConsumer.prototype._registerConsumerOffset = function(offset, onReady) {

@@ -130,4 +182,16 @@ this.zooKafka.registerConsumerOffset(this.consumerGroup, this.topicPartition, offset, onReady);

/*
Dummy implementation of stream.Readable#_read
*/
PartitionConsumer.prototype._read = function() {};
/*
Consume from Kafka.
Kafka 0.7 is currently polling only.
*/
PartitionConsumer.prototype.consumeNext = function() {

@@ -148,9 +212,9 @@ var _this = this;

if (error.message !== 'OffsetOutOfRange') {
return _this.onConsumptionError(_this, error);
return _this.onConsumptionError.apply(_this, [error]);
}
_this.emit('offsetOutOfRange', _this.partitionDetails.offset);
return _this.onOffsetOutOfRange(_this);
return _this.onOffsetOutOfRange.apply(_this);
}
if (messages.length === 0) {
return _this.onNoMessages(_this);
return _this.onNoMessages.apply(_this);
}

@@ -164,2 +228,7 @@ return _this.push({

/*
Utility method
*/
PartitionConsumer.prototype.fatal = function(msg, detail) {

@@ -166,0 +235,0 @@ return this.emit('error', msg, detail);

// This file has been generated from coffee source files
var Compression, PartitionConsumer, Readable, StandaloneStrategy, TopicConsumer, async, util, uuid, _,
var Compression, PartitionConsumer, Readable, TopicConsumer, async, rebalanceStrategy, util, uuid, _,
__hasProp = {}.hasOwnProperty,

@@ -21,10 +21,33 @@ __extends = function(child, parent) { for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; };

StandaloneStrategy = require('./rebalanceStrategy/Standalone');
rebalanceStrategy = require('./rebalanceStrategy');
/*
Coordinates the consumption of the various partitions and brokers
where topic messages are stored.
Creation is done by {@link Kafakazoo), which provides a reference to the
created topic consumer.
Information about this is retrieved from zookeeper, which partitions and
brokers to consume is further determined by the rebalance strategy.
*/
module.exports = TopicConsumer = (function(_super) {
__extends(TopicConsumer, _super);
/*
Constructs a topic consumer.
@param {Object} connections Kafkazoo connections
@param {String} consumerGroup Kafka consumer group
@param {String} consumerGroup Kafka topic
@param {Object} options (optional)
@param {Object} options.rebalanceStrategy (optional) Rebalance strategy class
(defaults to StandaloneStrategy)
*/
function TopicConsumer(connections, consumerGroup, topic, options) {
var rebalanceStrategy,
_this = this;
var _this = this;
TopicConsumer.__super__.constructor.call(this, {

@@ -38,5 +61,4 @@ objectMode: true

this.consumerId = options.consumerId || uuid.v1();
rebalanceStrategy = options.rebalanceStrategy || StandaloneStrategy;
this.rebalancer = new rebalanceStrategy(this.connections, this.consumerGroup, this.topic, this.consumerId);
this.rebalancer.on('partitions', this.rebalance);
this.rebalanceStrategy = options.rebalanceStrategy || rebalanceStrategy.standAlone;
this.on('partitions', this.rebalance);
this.partitionConsumers = {};

@@ -55,8 +77,33 @@ this.partitionConsumerConfig = {};

/*
Connects to zookeeper and kafka.
Delegates to the rebalance strategy to deliver the partitions to read from.
This is done via the ```partitions``` event, that fires #rebalance
*/
TopicConsumer.prototype.connect = function() {
return this.rebalancer.connect();
return this.rebalanceStrategy.apply(this);
};
/*
Dummy implementation of stream.Readable#_read
No automatic reading
*/
TopicConsumer.prototype._read = function() {};
/*
Connecting and disconnection the low level partition consumers, as provided by the
rebalance strategy,
Currently only does initial connection
TODO: implement rebalancing
*/
TopicConsumer.prototype.rebalance = function(partitions) {

@@ -73,2 +120,9 @@ var _this = this;

/*
Construction of a partition consumer.
Wires the various events
*/
TopicConsumer.prototype.connectPartitionConsumer = function(partition) {

@@ -75,0 +129,0 @@ var event, id, partitionConsumer, _fn, _i, _len, _ref,

// This file has been generated from coffee source files
var ZooKafka, async, _;
var ZooKafka, async, _,
__indexOf = [].indexOf || function(item) { for (var i = 0, l = this.length; i < l; i++) { if (i in this && this[i] === item) return i; } return -1; };

@@ -9,3 +10,102 @@ async = require('async');

/*
Navigating the zookeeper registry for Kafka, to be compliant with the official Kafka client
v0.7 zookeeper structure, based on
- [Kafka wiki](https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka), Sep 2013
- [Kafka source](https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala)
General:
- brokerId : number, configured on broker
eg: 2
- creator : assigned name, <ip>-<epoch stamp>
eg: 10.0.0.12-1324306324402
- groupId : alphanumeric, app
eg: cheeseLovers
- topic : alphanumeric
eg: cheese
- consumerId : alphanumeric, configured on consumer
eg: mouse-1
- partitionId: number
eg: 4
Broker registration:
- path: /brokers/ids/[brokerId]
- value: [creator]:[host]:[port]
- eg: /brokers/ids/2 => 10.0.0.12-1324306324402:10.0.0.12:9092
Broker topic registration:
- path: /brokers/topics/[topic]/[brokerId]
- value: [numberOfPartitionsOnBroker]
- eg: /brokers/topics/cheese/2 => 4
Consumer registration:
- path: /consumers/[groupId]/ids/[consumerId]
- eg: /consumers/cheeseLovers/ids/mouse-1
Consumer topic registration:
- path: /consumers/[groupId]/ids/[consumerId]/[topic]
- eg: /consumers/cheeseLovers/ids/mouse-1/cheese
Consumer owner tracking:
- path: /consumers/[groupId]/owner/[topic]/[brokerId]-[partitionId]
- value: [consumerId]
- eg: /consumers/cheeseLovers/owner/cheese/2-4 => mouse-1
- changes on re-balancing
Consumer offset tracking:
- path: /consumers/[groupId]/offsets/[topic]/[broker_id-partition_id]
- value: [offsetCounterValue]
- eg: /consumers/cheeseLovers/offsets/cheese/2-4 => 1024
Producers (existing topic):
1. Producer created per topic
2. Read /brokers/ids/[brokerId], map [brokerId] -> Kafka connection
3. Read /brokers/topics/[topic]/[brokerId], map [brokerId-PartitionId] -> broker connection
4. On send: pick brokerId-PartitionId
Watch:
- add/delete child: /brokers/ids
- change value: /brokers/topics/[topic]/[brokerId]
New topic on broker:
- /brokers/topics/[topic]/[brokerId] does not exist yet
- send to partition 0
- /brokers/topics/[topic]/[brokerId] will be updated
Consumers:
not always registered in zookeeper, eg when directly using Prozess
normal java client behaviour, which we should follow:
1. register
2. trigger re-balance?
Expired session:
- release ownership, re-register,re-balance
Re-balancing, as done by Java client
- All Consumers in a ConsumerGroup will come to a consensus as to who is consuming what.
- Each Broker+Topic+Partition combination is consumed by one and only one Consumer
even if it means that some Consumers don't get anything at all.
- A Consumer should try to have as many partitions on the same Broker as possible,
sort the list by [Broker ID]-[Partition] (0-0, 0-1, 0-2, etc.), and assign them in chunks.
- Consumers are sorted by their Consumer IDs.
If there are three Consumers, two Brokers, and three partitions in each, the split might look like:
Consumer A: [0-0, 0-1]
Consumer B: [0-2, 1-0]
Consumer C: [1-1, 1-2]
- If the distribution can't be even and some Consumers must have more partitions than others,
the extra partitions always go to the earlier consumers on the list.
So you could have a distribution like 4-4-4-4 or 5-5-4-4, but never 4-4-4-5 or 4-5-4-4.
TODO: cache data, watch for changes
*/
module.exports = ZooKafka = (function() {
/*
Constructs the object, done by {@link Kafkazoo}
@param {Object} client Zookeeper client
@praram [options="{}"]
*/
function ZooKafka(client, options) {

@@ -16,2 +116,16 @@ this.client = client;

/*
Returns all registered (active) brokers with their connection details
@param {Function} onData Callback
@param onData.error
@param {Object} onData.brokers
@param onData.brokers.[brokerId]
@param onData.brokers.[brokerId].name
@param onData.brokers.[brokerId].host
@param onData.brokers.[brokerId].port
@param onData.brokers.[brokerId].id
*/
ZooKafka.prototype.getAllRegisteredBrokers = function(onData) {

@@ -42,2 +156,16 @@ var brokerMap, zkPath,

/*
Returns connection details for a given broker
@param {String} brokerId
@param {Function} onData Callback
@param onData.error
@param {Object} onData.broker
@param onData.broker.name
@param onData.broker.host
@param onData.broker.port
@param onData.broker.id
*/
ZooKafka.prototype.getRegisteredBroker = function(brokerId, onData) {

@@ -60,2 +188,11 @@ var zkPath,

/*
Returns all registered topics (names only)
@param {Function} onData Callback
@param onData.error
@param {String[]} onData.topics List of topic names
*/
ZooKafka.prototype.getAllRegisteredTopics = function(onData) {

@@ -76,2 +213,61 @@ var zkPath,

/*
Get all registered (active) brokers for topic
*/
ZooKafka.prototype.getRegisteredTopicBrokers = function(topic, options, onData) {
var partitionMap, zkPath,
_this = this;
if (!onData && options && _.isFunction(options)) {
onData = options;
options = {};
}
options = _.defaults(options || {});
zkPath = ['/brokers/topics', topic];
partitionMap = {};
return async.parallel({
activeBrokers: function(asyncReady) {
return _this.getAllRegisteredBrokers(asyncReady);
},
topicBrokers: function(asyncReady) {
return _this.client.getChildren(zkPath, function(error, brokers) {
if (error) {
return onData({
msg: 'Error retrieving topic partitions',
error: error
});
}
return asyncReady(null, brokers);
});
}
}, function(error, result) {
var brokers;
if (error) {
return onData(error);
}
brokers = result.activeBrokers.filter(function(broker) {
return __indexOf.call(result.topicBrokers, broker) >= 0;
});
return onData(null, brokers);
});
};
/*
Returns all registered topic partitions, by default only for registered brokers
@param {String} topic
@param {Object} [options="{onlyRegisteredBrokers: true}"]
@param {Boolean} options.onlyRegisteredBrokers
@param {Function} onData Callback
@param onData.error
@param {Object} onData.topicPartitions
@param onData.topicPartitions.[brokerId-ParttionId]
@param onData.topicPartitions.[brokerId-ParttionId].topic
@param onData.topicPartitions.[brokerId-ParttionId].brokerPartitionId
@param onData.topicPartitions.[brokerId-ParttionId].brokerId
@param onData.topicPartitions.[brokerId-ParttionId].PartitionId
*/
ZooKafka.prototype.getRegisteredTopicPartitions = function(topic, options, onData) {

@@ -136,2 +332,20 @@ var partitionMap, zkPath,

/*
Returns registered consumer offset for given topicPartition and consumerGroup
If no offset is registered, 0 is returned.
@param {String} consumerGroup
@param {Object} topicPartition Normally use the output from #getRegisteredTopicPartitions
@param {String} topicPartition.topic
@param {String} topicPartition.brokerPartitionId
@param {Function} onData Callback
@param onData.error
@param {Object} onData.registeredOffset
@param {String} onData.registeredOffset.consumerGroup As provided
@param {Object} onData.registeredOffset.topicPartition As provided
@param {String} onData.registeredOffset.offset
*/
ZooKafka.prototype.getRegisteredConsumerOffset = function(consumerGroup, topicPartition, onData) {

@@ -156,2 +370,22 @@ var zkPath;

/*
Returns registered consumer offset for given topicPartition and consumerGroup, and the
broker connection details for the topicPartition
If no offset is registered, 0 is returned.
@param {String} consumerGroup
@param {Object} topicPartition Normally use the output from #getRegisteredTopicPartitions
@param {String} topicPartition.topic
@param {String} topicPartition.brokerPartitionId
@param {String} topicPartition.brokerId
@param {Function} onData Callback
@param onData.error
@param {Object} onData.details
@param {Object} onData.details.topicPartition As provided
@param {Object} onData.details.broker From #getRegisteredBroker
@param {Object} onData.details.consumerOffset From #getRegisteredConsumerOffset
*/
ZooKafka.prototype.getPartitionConnectionAndOffsetDetails = function(consumerGroup, topicPartition, onData) {

@@ -176,2 +410,15 @@ var _this = this;

/*
Registers the provided offset for the given consumerGroup and topicPartition
@param {String} consumerGroup
@param {Object} topicPartition Normally use the output from #getRegisteredTopicPartitions
@param {String} topicPartition.topic
@param {String} topicPartition.brokerPartitionId
@param {String} offset The new offset value
@param {Function} onReady Callback
@param onReady.error
*/
ZooKafka.prototype.registerConsumerOffset = function(consumerGroup, topicPartition, offset, onReady) {

@@ -178,0 +425,0 @@ var zkPath,

@@ -53,3 +53,3 @@ // This file has been generated from coffee source files

};
return decompressor.onErrorDecompressing(decompressor, {}, 'foo error', providedDetails, function(err) {
return decompressor.onErrorDecompressing(decompressor, 'foo error', providedDetails, function(err) {
err.msg.should.equal('foo error');

@@ -90,3 +90,3 @@ err.detail.should.equal(providedDetails);

result.should.eql({
messages: [original1, original2, original1, original2],
messages: [[original1, original2], [original1, original2]],
offset: '123'

@@ -136,3 +136,3 @@ });

result.should.eql({
messages: [payload1, original1, original2, payload2],
messages: [payload1, [original1, original2], payload2],
offset: '123'

@@ -139,0 +139,0 @@ });

@@ -7,3 +7,3 @@ // This file has been generated from coffee source files

FakeProzess = require('../../src/index').FakeProzess;
FakeProzess = require('../../src/lib/FakeProzess');

@@ -10,0 +10,0 @@ describe('FakeProzess', function() {

// This file has been generated from coffee source files
var TopicConsumer, ZooKafka, mockery, should, sinon, _;
var TopicConsumer, TopicProducer, ZooKafka, mockery, should, sinon, _;

@@ -15,2 +15,4 @@ mockery = require('mockery');

TopicProducer = require('../../src/lib/TopicProducer');
ZooKafka = require('../../src/lib/ZooKafka');

@@ -55,3 +57,6 @@

}));
mockery.registerAllowables(['../../src/lib/Kafkazoo', 'events', 'util', 'underscore']);
mockery.registerMock('./TopicProducer', stubbedClass('topicProducer', function(stub) {
return stub.connections = stub._initArgs[0];
}));
mockery.registerAllowables(['../../src/lib/Kafkazoo', './Connections', 'events', 'util', 'underscore']);
return Kafkazoo = require('../../src/lib/Kafkazoo');

@@ -74,2 +79,3 @@ });

stubbed['topicConsumer'] = sinon.stub(new TopicConsumer(kafkaConnectionsStub));
stubbed['topicProducer'] = sinon.stub(new TopicProducer(kafkaConnectionsStub));
return kafka = new Kafkazoo();

@@ -166,3 +172,3 @@ });

});
return describe('createConsumer', function() {
describe('createConsumer', function() {
it('should create a topic consumer with given group and topic', function() {

@@ -180,2 +186,7 @@ kafka.createConsumer('topic', 'group');

});
return describe('createProducer', function() {
return it('should create a producer wfor a given topic', function() {
return kafka.createProducer('topic');
});
});
});

@@ -182,0 +193,0 @@

// This file has been generated from coffee source files
var concat, mockery, should, sinon, stream, util, _,
var FakeProzess, concat, mockery, should, sinon, stream, util, _,
__indexOf = [].indexOf || function(item) { for (var i = 0, l = this.length; i < l; i++) { if (i in this && this[i] === item) return i; } return -1; };

@@ -20,2 +20,4 @@

FakeProzess = require('../../src/lib/FakeProzess');
describe('PartitionConsumer class', function() {

@@ -281,3 +283,3 @@ var DEFAULT_NOMSG_TIMEOUT, PartitionConsumer, ProzessConsumerStub, ProzessStub, TopicConsumerStub, ZookeeperClientStub, createTestPartitionConsumer, fakeClock, given, partitionConsumer;

});
return partitionConsumer.onConsumptionError(partitionConsumer, 'foo error');
return partitionConsumer.onConsumptionError('foo error');
});

@@ -540,3 +542,3 @@ });

delegated.calledOnce.should.be["true"];
return delegated.calledWith(partitionConsumer).should.be["true"];
return delegated.calledWith().should.be["true"];
});

@@ -568,3 +570,3 @@ it('should try another consumeNext by default', function() {

delegated.calledOnce.should.be["true"];
return delegated.calledWith(partitionConsumer, 'foo!').should.be["true"];
return delegated.calledWith('foo!').should.be["true"];
});

@@ -621,4 +623,3 @@ it('should emit error event by default', function() {

partitionConsumer.consumeNext();
delegated.calledOnce.should.be["true"];
return delegated.calledWith(partitionConsumer).should.be["true"];
return delegated.calledOnce.should.be["true"];
});

@@ -625,0 +626,0 @@ it('should retry consume by default (after a timeout)', function() {

@@ -29,12 +29,5 @@ // This file has been generated from coffee source files

before(function() {
StrategyStub = (function() {
function StrategyStub() {}
StrategyStub.prototype.on = function() {};
StrategyStub.prototype.connect = function() {};
return StrategyStub;
})();
StrategyStub = {
standAlone: function() {}
};
PartitionConsumerStub = (function(_super) {

@@ -69,3 +62,3 @@ __extends(PartitionConsumerStub, _super);

mockery.registerMock('./PartitionConsumer', PartitionConsumerStub);
mockery.registerMock('./rebalanceStrategy/Standalone', StrategyStub);
mockery.registerMock('./rebalanceStrategy', StrategyStub);
mockery.registerAllowables(['stream', 'util', 'async', 'underscore', 'zlib', 'crypto', './Compression', '../../src/lib/TopicConsumer']);

@@ -123,19 +116,14 @@ return TopicConsumer = require('../../src/lib/TopicConsumer');

topicConsumer.partitionConsumers.should.eql({});
return topicConsumer.rebalancer.should.be.instanceOf(StrategyStub);
return topicConsumer.rebalanceStrategy.should.be.equal(StrategyStub.standAlone);
});
it('can be given a strategy class', function() {
var StrategyStub2;
StrategyStub2 = (function() {
function StrategyStub2() {}
StrategyStub2.prototype.on = function() {};
return StrategyStub2;
})();
it('can be given a rebalancer strategy', function() {
var strategy;
strategy = function() {
return this.emit('error', 'not called');
};
topicConsumer = new TopicConsumer(zkClientStub, 'groupA', 'foo', {
rebalanceStrategy: StrategyStub2
rebalanceStrategy: strategy
});
topicConsumer.rebalancer.should.not.be.instanceOf(StrategyStub);
return topicConsumer.rebalancer.should.be.instanceOf(StrategyStub2);
topicConsumer.rebalanceStrategy.should.not.be.equal(StrategyStub.standAlone);
return topicConsumer.rebalanceStrategy.should.be.equal(strategy);
});

@@ -160,6 +148,6 @@ return it('should stream data via preprocess', function(done) {

describe('#connect', function() {
return it('should connect the used rebalance strategy', function() {
sinon.spy(topicConsumer.rebalancer, 'connect');
return it('should call the used rebalance strategy', function() {
sinon.stub(topicConsumer, 'rebalanceStrategy');
topicConsumer.connect();
return topicConsumer.rebalancer.connect.calledOnce.should.be["true"];
return topicConsumer.rebalanceStrategy.calledOnce.should.be["true"];
});

@@ -166,0 +154,0 @@ });

@@ -11,3 +11,3 @@ {

],
"version": "0.1.1",
"version": "0.2.0",
"repository": {

@@ -14,0 +14,0 @@ "type": "git",

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet