Comparing version 1.0.2 to 1.1.0
@@ -12,3 +12,3 @@ 'use strict'; | ||
function BaseConsumer(options) { | ||
this.options = _.partialRight(_.merge, _.defaults)(options || {}, { | ||
this.options = _.defaultsDeep(options || {}, { | ||
timeout: 100, // client timeout for produce and fetch requests | ||
@@ -15,0 +15,0 @@ idleTimeout: 1000, // timeout between fetch requests |
@@ -5,3 +5,3 @@ 'use strict'; | ||
var Connection = require('./connection'); | ||
var protocol = require('./protocol'); | ||
var Protocol = require('./protocol'); | ||
var errors = require('./errors'); | ||
@@ -14,3 +14,3 @@ var _ = require('lodash'); | ||
self.options = _.partialRight(_.merge, _.defaults)(options || {}, { | ||
self.options = _.defaultsDeep(options || {}, { | ||
clientId: 'no-kafka-client', | ||
@@ -28,3 +28,5 @@ connectionString: '127.0.0.1:9092', | ||
self.encoder = new protocol.Writer(256 * 1024); | ||
self.protocol = new Protocol({ | ||
writerBufSize: 256 * 1024 | ||
}); | ||
@@ -46,2 +48,3 @@ // client metadata | ||
return _(topics).flatten().transform(function (a, tv) { | ||
if (tv === undefined) { return false; } // requiredAcks=0 | ||
_.each(tv.partitions, function (p) { | ||
@@ -133,11 +136,11 @@ a.push(_.merge({ | ||
buffer = self.encoder.reset().MetadataRequest({ | ||
buffer = self.protocol.write().MetadataRequest({ | ||
correlationId: self.correlationId++, | ||
clientId: self.options.clientId, | ||
topicNames: topicNames || [] | ||
}).result(); | ||
}).result; | ||
return Promise.any(self.initialBrokers.map(function (connection) { | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
return protocol.read(responseBuffer).MetadataResponse().result; | ||
return self.protocol.read(responseBuffer).MetadataResponse().result; | ||
}); | ||
@@ -200,3 +203,3 @@ })) | ||
return Promise.all(_.map(requests, function (topics, leader) { | ||
var buffer = self.encoder.reset().ProduceRequest({ | ||
var buffer = self.protocol.write().ProduceRequest({ | ||
correlationId: self.correlationId++, | ||
@@ -207,7 +210,7 @@ clientId: self.options.clientId, | ||
topics: topics | ||
}).result(); | ||
}).result; | ||
return self.brokerConnections[leader].send(buffer, self.options.requiredAcks === 0).then(function (responseBuffer) { | ||
if (self.options.requiredAcks !== 0) { | ||
return protocol.read(responseBuffer).ProduceResponse().result.topics; | ||
return self.protocol.read(responseBuffer).ProduceResponse().result.topics; | ||
} | ||
@@ -235,3 +238,3 @@ }) | ||
buffer = self.encoder.reset().FetchRequest({ | ||
buffer = self.protocol.write().FetchRequest({ | ||
correlationId: self.correlationId++, | ||
@@ -242,6 +245,6 @@ clientId: self.options.clientId, | ||
topics: topics | ||
}).buffer; | ||
}).result; | ||
return self.brokerConnections[leader].send(buffer).then(function (responseBuffer) { | ||
return protocol.read(responseBuffer).FetchResponse().result.topics; | ||
return self.protocol.read(responseBuffer).FetchResponse().result.topics; | ||
}) | ||
@@ -261,10 +264,10 @@ .catch(errors.NoKafkaConnectionError, function (err) { | ||
return Promise.all(_.map(requests, function (topics, leader) { | ||
var buffer = self.encoder.reset().OffsetRequest({ | ||
var buffer = self.protocol.write().OffsetRequest({ | ||
correlationId: self.correlationId++, | ||
clientId: self.options.clientId, | ||
topics: topics | ||
}).buffer; | ||
}).result; | ||
return self.brokerConnections[leader].send(buffer).then(function (responseBuffer) { | ||
return protocol.read(responseBuffer).OffsetResponse().result.topics; | ||
return self.protocol.read(responseBuffer).OffsetResponse().result.topics; | ||
}); | ||
@@ -281,3 +284,3 @@ })) | ||
return Promise.all(_.map(requests, function (topics, leader) { | ||
var buffer = self.encoder.reset().OffsetCommitRequestV0({ | ||
var buffer = self.protocol.write().OffsetCommitRequestV0({ | ||
correlationId: self.correlationId++, | ||
@@ -287,6 +290,6 @@ clientId: self.options.clientId, | ||
topics: topics | ||
}).buffer; | ||
}).result; | ||
return self.brokerConnections[leader].send(buffer).then(function (responseBuffer) { | ||
return protocol.read(responseBuffer).OffsetCommitResponse().result.topics; | ||
return self.protocol.read(responseBuffer).OffsetCommitResponse().result.topics; | ||
}); | ||
@@ -304,3 +307,3 @@ })) | ||
return Promise.all(_.map(requests, function (topics, leader) { | ||
var buffer = self.encoder.reset().OffsetFetchRequest({ | ||
var buffer = self.protocol.write().OffsetFetchRequest({ | ||
correlationId: self.correlationId++, | ||
@@ -311,6 +314,6 @@ clientId: self.options.clientId, | ||
topics: topics | ||
}).buffer; | ||
}).result; | ||
return self.brokerConnections[leader].send(buffer).then(function (responseBuffer) { | ||
return protocol.read(responseBuffer).OffsetFetchResponse().result.topics; | ||
return self.protocol.read(responseBuffer).OffsetFetchResponse().result.topics; | ||
}); | ||
@@ -327,3 +330,3 @@ })) | ||
return self._findGroupCoordinator(groupId).then(function (connection) { | ||
var buffer = self.encoder.reset().OffsetFetchRequest({ | ||
var buffer = self.protocol.write().OffsetFetchRequest({ | ||
correlationId: self.correlationId++, | ||
@@ -334,6 +337,6 @@ clientId: self.options.clientId, | ||
topics: requests | ||
}).buffer; | ||
}).result; | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
return protocol.read(responseBuffer).OffsetFetchResponse().result.topics; | ||
return self.protocol.read(responseBuffer).OffsetFetchResponse().result.topics; | ||
}); | ||
@@ -365,11 +368,11 @@ }) | ||
buffer = self.encoder.reset().GroupCoordinatorRequest({ | ||
buffer = self.protocol.write().GroupCoordinatorRequest({ | ||
correlationId: self.correlationId++, | ||
clientId: self.options.clientId, | ||
groupId: groupId | ||
}).buffer; | ||
}).result; | ||
self.groupCoordinators[groupId] = Promise.any(self.initialBrokers.map(function (connection) { | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
var result = protocol.read(responseBuffer).GroupCoordinatorResponse().result; | ||
var result = self.protocol.read(responseBuffer).GroupCoordinatorResponse().result; | ||
if (result.error) { | ||
@@ -402,3 +405,3 @@ throw result.error; | ||
return self._findGroupCoordinator(groupId).then(function (connection) { | ||
var buffer = self.encoder.reset().JoinConsumerGroupRequest({ | ||
var buffer = self.protocol.write().JoinConsumerGroupRequest({ | ||
correlationId: self.correlationId++, | ||
@@ -410,6 +413,6 @@ clientId: self.options.clientId, | ||
groupProtocols: strategies | ||
}).buffer; | ||
}).result; | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
var result = protocol.read(responseBuffer).JoinConsumerGroupResponse().result; | ||
var result = self.protocol.read(responseBuffer).JoinConsumerGroupResponse().result; | ||
if (result.error) { | ||
@@ -427,3 +430,3 @@ throw result.error; | ||
return self._findGroupCoordinator(groupId).then(function (connection) { | ||
var buffer = self.encoder.reset().HeartbeatRequest({ | ||
var buffer = self.protocol.write().HeartbeatRequest({ | ||
correlationId: self.correlationId++, | ||
@@ -434,6 +437,6 @@ clientId: self.options.clientId, | ||
generationId: generationId | ||
}).buffer; | ||
}).result; | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
var result = protocol.read(responseBuffer).HeartbeatResponse().result; | ||
var result = self.protocol.read(responseBuffer).HeartbeatResponse().result; | ||
if (result.error) { | ||
@@ -451,3 +454,3 @@ throw result.error; | ||
return self._findGroupCoordinator(groupId).then(function (connection) { | ||
var buffer = self.encoder.reset().SyncConsumerGroupRequest({ | ||
var buffer = self.protocol.write().SyncConsumerGroupRequest({ | ||
correlationId: self.correlationId++, | ||
@@ -459,6 +462,6 @@ clientId: self.options.clientId, | ||
groupAssignment: groupAssignment | ||
}).buffer; | ||
}).result; | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
var result = protocol.read(responseBuffer).SyncConsumerGroupResponse().result; | ||
var result = self.protocol.read(responseBuffer).SyncConsumerGroupResponse().result; | ||
if (result.error) { | ||
@@ -476,3 +479,3 @@ throw result.error; | ||
return self._findGroupCoordinator(groupId).then(function (connection) { | ||
var buffer = self.encoder.reset().LeaveGroupRequest({ | ||
var buffer = self.protocol.write().LeaveGroupRequest({ | ||
correlationId: self.correlationId++, | ||
@@ -482,6 +485,6 @@ clientId: self.options.clientId, | ||
memberId: memberId | ||
}).buffer; | ||
}).result; | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
var result = protocol.read(responseBuffer).LeaveGroupResponse().result; | ||
var result = self.protocol.read(responseBuffer).LeaveGroupResponse().result; | ||
if (result.error) { | ||
@@ -500,3 +503,3 @@ throw result.error; | ||
return self._findGroupCoordinator(groupId).then(function (connection) { | ||
var buffer = self.encoder.reset().OffsetCommitRequestV2({ | ||
var buffer = self.protocol.write().OffsetCommitRequestV2({ | ||
correlationId: self.correlationId++, | ||
@@ -509,6 +512,6 @@ clientId: self.options.clientId, | ||
topics: requests | ||
}).buffer; | ||
}).result; | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
return protocol.read(responseBuffer).OffsetCommitResponse().result.topics; | ||
return self.protocol.read(responseBuffer).OffsetCommitResponse().result.topics; | ||
}); | ||
@@ -523,6 +526,6 @@ }) | ||
var buffer = self.encoder.reset().ListGroupsRequest({ | ||
var buffer = self.protocol.write().ListGroupsRequest({ | ||
correlationId: self.correlationId++, | ||
clientId: self.options.clientId | ||
}).buffer; | ||
}).result; | ||
@@ -532,3 +535,3 @@ return self._metadata().then(function () { | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
return protocol.read(responseBuffer).ListGroupResponse().result.groups; | ||
return self.protocol.read(responseBuffer).ListGroupResponse().result.groups; | ||
}); | ||
@@ -544,10 +547,10 @@ }); | ||
return self._findGroupCoordinator(groupId).then(function (connection) { | ||
var buffer = self.encoder.reset().DescribeGroupRequest({ | ||
var buffer = self.protocol.write().DescribeGroupRequest({ | ||
correlationId: self.correlationId++, | ||
clientId: self.options.clientId, | ||
groups: [groupId] | ||
}).buffer; | ||
}).result; | ||
return connection.send(buffer).then(function (responseBuffer) { | ||
return protocol.read(responseBuffer).DescribeGroupResponse().result.groups[0]; | ||
return self.protocol.read(responseBuffer).DescribeGroupResponse().result.groups[0]; | ||
}); | ||
@@ -554,0 +557,0 @@ }); |
@@ -10,3 +10,3 @@ 'use strict'; | ||
function GroupConsumer(options) { | ||
this.options = _.partialRight(_.merge, _.defaults)(options || {}, { | ||
this.options = _.defaultsDeep(options || {}, { | ||
groupId: 'no-kafka-group-v0.9', | ||
@@ -322,19 +322,1 @@ sessionTimeout: 15000, // min 6000, max 30000 | ||
/** | ||
* List all consumer groups | ||
* | ||
* @return {Promise} | ||
*/ | ||
GroupConsumer.prototype.listGroups = function () { | ||
return this.client.listGroupsRequest(); | ||
}; | ||
/** | ||
* Describe consumer group | ||
* | ||
* @param {String} groupId | ||
* @return {Promise} | ||
*/ | ||
GroupConsumer.prototype.describeGroup = function (groupId) { | ||
return this.client.describeGroupRequest(groupId || this.options.groupId); | ||
}; |
@@ -63,2 +63,3 @@ 'use strict'; | ||
exports.GroupConsumer = require('./group_consumer'); | ||
exports.GroupAdmin = require('./group_admin'); | ||
@@ -65,0 +66,0 @@ // offset request time constants |
@@ -9,3 +9,3 @@ 'use strict'; | ||
function Producer(options) { | ||
this.options = _.partialRight(_.merge, _.defaults)(options || {}, { | ||
this.options = _.defaultsDeep(options || {}, { | ||
requiredAcks: 1, | ||
@@ -12,0 +12,0 @@ timeout: 100 |
'use strict'; | ||
var protocol = require('bin-protocol'); | ||
var Protocol = require('./index'); | ||
var globals = require('./globals'); | ||
protocol.define('ListGroupsRequest', { | ||
Protocol.define('ListGroupsRequest', { | ||
write: function (data) { | ||
@@ -18,3 +18,3 @@ this | ||
protocol.define('ListGroupResponse_GroupItem', { | ||
Protocol.define('ListGroupResponse_GroupItem', { | ||
read: function () { | ||
@@ -27,3 +27,3 @@ this | ||
protocol.define('ListGroupResponse', { | ||
Protocol.define('ListGroupResponse', { | ||
read: function () { | ||
@@ -37,3 +37,3 @@ this | ||
protocol.define('DescribeGroupRequest', { | ||
Protocol.define('DescribeGroupRequest', { | ||
write: function (data) { | ||
@@ -51,3 +51,3 @@ this | ||
protocol.define('DescribeGroupResponse_ConsumerGroupMemberItem', { | ||
Protocol.define('DescribeGroupResponse_ConsumerGroupMemberItem', { | ||
read: function () { | ||
@@ -66,3 +66,4 @@ this | ||
protocol.define('DescribeGroupResponse_GroupMemberItem', { | ||
/* istanbul ignore next */ | ||
Protocol.define('DescribeGroupResponse_GroupMemberItem', { | ||
read: function () { | ||
@@ -78,3 +79,3 @@ this | ||
protocol.define('DescribeGroupResponse_GroupItem', { | ||
Protocol.define('DescribeGroupResponse_GroupItem', { | ||
read: function () { | ||
@@ -90,2 +91,3 @@ this | ||
} else { | ||
/* istanbul ignore next */ | ||
this.array('members', this.DescribeGroupResponse_GroupMemberItem); | ||
@@ -96,3 +98,3 @@ } | ||
protocol.define('DescribeGroupResponse', { | ||
Protocol.define('DescribeGroupResponse', { | ||
read: function () { | ||
@@ -99,0 +101,0 @@ this |
'use strict'; | ||
var protocol = require('bin-protocol'); | ||
var Protocol = require('./index'); | ||
var errors = require('../errors'); | ||
@@ -17,3 +17,3 @@ var crc32 = require('buffer-crc32'); | ||
// variable length primitives, bytes | ||
protocol.define('bytes', { | ||
Protocol.define('bytes', { | ||
read: function () { | ||
@@ -31,9 +31,8 @@ this.Int32BE('length'); | ||
} else { | ||
if (Buffer.isBuffer(value) || typeof value === 'string') { | ||
this | ||
.Int32BE(value.length) | ||
.raw(value); | ||
} else { | ||
throw new Error('Kafka bytes value should be a Buffer or String'); | ||
if (!Buffer.isBuffer(value)) { | ||
value = new Buffer(_(value).toString(), 'utf8'); | ||
} | ||
this | ||
.Int32BE(value.length) | ||
.raw(value); | ||
} | ||
@@ -44,3 +43,3 @@ } | ||
// variable length primitives, string | ||
protocol.define('string', { | ||
Protocol.define('string', { | ||
read: function () { | ||
@@ -58,10 +57,6 @@ this.Int16BE('length'); | ||
} else { | ||
if (typeof value === 'string') { | ||
value = new Buffer(value, 'utf8'); | ||
this | ||
.Int16BE(value.length) | ||
.raw(value); | ||
} else { | ||
throw new Error('Kafka string value should be a String'); | ||
} | ||
value = new Buffer(_(value).toString(), 'utf8'); | ||
this | ||
.Int16BE(value.length) | ||
.raw(value); | ||
} | ||
@@ -72,3 +67,3 @@ } | ||
// array | ||
protocol.define('array', { | ||
Protocol.define('array', { | ||
read: function (fn) { | ||
@@ -94,3 +89,3 @@ this.Int32BE('length'); | ||
// return Error instance | ||
protocol.define('ErrorCode', { | ||
Protocol.define('ErrorCode', { | ||
read: function () { | ||
@@ -103,3 +98,3 @@ this.Int16BE('error'); | ||
// 64 bit offset, convert from Long (https://github.com/dcodeIO/long.js) to double, 53 bit | ||
protocol.define('KafkaOffset', { | ||
Protocol.define('KafkaOffset', { | ||
read: function () { | ||
@@ -114,3 +109,3 @@ this.Int64BE('offset'); | ||
protocol.define('RequestHeader', { | ||
Protocol.define('RequestHeader', { | ||
write: function (header) { | ||
@@ -129,3 +124,3 @@ this | ||
protocol.define('MessageAttributes', { | ||
Protocol.define('MessageAttributes', { | ||
read: function () { | ||
@@ -140,3 +135,3 @@ this.Int8('_raw'); | ||
protocol.define('Message', { | ||
Protocol.define('Message', { | ||
read: function () { | ||
@@ -168,3 +163,3 @@ // var _o1 = this.offset; | ||
protocol.define('MessageSetItem', { | ||
Protocol.define('MessageSetItem', { | ||
read: function (size, end) { | ||
@@ -189,3 +184,3 @@ if (size < 8 + 4) { | ||
var _o1, _o2; | ||
this.Int64BE(value.offset); | ||
this.KafkaOffset(value.offset); | ||
_o1 = this.offset; | ||
@@ -202,3 +197,3 @@ this | ||
protocol.define('MessageSet', { | ||
Protocol.define('MessageSet', { | ||
read: function (size) { | ||
@@ -213,3 +208,3 @@ var _o1 = this.offset; | ||
return _.dropRightWhile(this.context.items, { _partial: true }); // drop partailly read messages | ||
return _.dropRightWhile(this.context.items, { _partial: true }); // drop partially read messages | ||
}, | ||
@@ -216,0 +211,0 @@ write: function (items) { |
'use strict'; | ||
var protocol = require('bin-protocol'); | ||
var Protocol = require('./index'); | ||
var globals = require('./globals'); | ||
@@ -12,7 +12,7 @@ | ||
protocol.define('FetchRequestPartitionItem', { | ||
Protocol.define('FetchRequestPartitionItem', { | ||
write: function (data) { // {partition, offset, maxBytes} | ||
this | ||
.Int32BE(data.partition) | ||
.Int64BE(data.offset) | ||
.KafkaOffset(data.offset) | ||
.Int32BE(data.maxBytes); | ||
@@ -22,3 +22,3 @@ } | ||
protocol.define('FetchRequestTopicItem', { | ||
Protocol.define('FetchRequestTopicItem', { | ||
write: function (data) { // {topicName, partitions} | ||
@@ -31,3 +31,3 @@ this | ||
protocol.define('FetchRequest', { | ||
Protocol.define('FetchRequest', { | ||
write: function (data) { // { timeout, minBytes, topics } | ||
@@ -48,3 +48,3 @@ this | ||
protocol.define('FetchResponseTopicItem', { | ||
Protocol.define('FetchResponseTopicItem', { | ||
read: function () { | ||
@@ -57,3 +57,3 @@ this | ||
protocol.define('FetchResponsePartitionItem', { | ||
Protocol.define('FetchResponsePartitionItem', { | ||
read: function () { | ||
@@ -69,3 +69,3 @@ this | ||
protocol.define('FetchResponse', { | ||
Protocol.define('FetchResponse', { | ||
read: function () { | ||
@@ -72,0 +72,0 @@ this |
'use strict'; | ||
var protocol = require('bin-protocol'); | ||
var Protocol = require('./index'); | ||
var globals = require('./globals'); | ||
@@ -13,3 +13,3 @@ | ||
protocol.define('GroupCoordinatorRequest', { | ||
Protocol.define('GroupCoordinatorRequest', { | ||
write: function (data) { // { groupId } | ||
@@ -27,3 +27,3 @@ this | ||
protocol.define('GroupCoordinatorResponse', { | ||
Protocol.define('GroupCoordinatorResponse', { | ||
read: function () { | ||
@@ -39,3 +39,4 @@ this | ||
protocol.define('JoinGroupRequest_GroupProtocolItem', { | ||
/* istanbul ignore next */ | ||
Protocol.define('JoinGroupRequest_GroupProtocolItem', { | ||
write: function (data) { // { name, metadata } | ||
@@ -48,3 +49,4 @@ this | ||
protocol.define('JoinGroupRequest', { | ||
/* istanbul ignore next */ | ||
Protocol.define('JoinGroupRequest', { | ||
write: function (data) { // { groupId sessionTimeout memberId protocolType groupProtocols } | ||
@@ -67,3 +69,3 @@ this | ||
// consumer protocol | ||
protocol.define('JoinConsumerGroupRequest_GroupProtocolItem', { | ||
Protocol.define('JoinConsumerGroupRequest_GroupProtocolItem', { | ||
write: function (data) { // { strategy, version, subscriptions, metadata } | ||
@@ -86,3 +88,3 @@ var _o1, _o2; | ||
protocol.define('JoinConsumerGroupRequest', { | ||
Protocol.define('JoinConsumerGroupRequest', { | ||
write: function (data) { // { groupId sessionTimeout memberId groupProtocols } | ||
@@ -104,3 +106,4 @@ this | ||
protocol.define('JoinGroupResponse_Member', { | ||
/* istanbul ignore next */ | ||
Protocol.define('JoinGroupResponse_Member', { | ||
read: function () { | ||
@@ -113,3 +116,4 @@ this | ||
protocol.define('JoinGroupResponse', { | ||
/* istanbul ignore next */ | ||
Protocol.define('JoinGroupResponse', { | ||
read: function () { | ||
@@ -127,3 +131,3 @@ this | ||
protocol.define('JoinConsumerGroupResponse_Member', { | ||
Protocol.define('JoinConsumerGroupResponse_Member', { | ||
read: function () { | ||
@@ -139,3 +143,3 @@ this | ||
protocol.define('JoinConsumerGroupResponse', { | ||
Protocol.define('JoinConsumerGroupResponse', { | ||
read: function () { | ||
@@ -153,3 +157,4 @@ this | ||
protocol.define('SyncGroupRequest_GroupAssignment', { | ||
/* istanbul ignore next */ | ||
Protocol.define('SyncGroupRequest_GroupAssignment', { | ||
write: function (data) { // { memberId memberAssignment } | ||
@@ -162,3 +167,4 @@ this | ||
protocol.define('SyncGroupRequest', { | ||
/* istanbul ignore next */ | ||
Protocol.define('SyncGroupRequest', { | ||
write: function (data) { // { groupId generationId memberId groupAssignment } | ||
@@ -180,3 +186,3 @@ this | ||
// consumer protocol | ||
protocol.define('SyncConsumerGroupRequest_PartitionAssignment', { | ||
Protocol.define('SyncConsumerGroupRequest_PartitionAssignment', { | ||
write: function (data) { // { topic, partitions } | ||
@@ -194,3 +200,3 @@ this | ||
protocol.define('SyncConsumerGroupRequest_MemberAssignment', { | ||
Protocol.define('SyncConsumerGroupRequest_MemberAssignment', { | ||
write: function (data) { // { version partitionAssignment metadata } | ||
@@ -214,3 +220,3 @@ this | ||
protocol.define('SyncConsumerGroupRequest_GroupAssignment', { | ||
Protocol.define('SyncConsumerGroupRequest_GroupAssignment', { | ||
write: function (data) { // { memberId memberAssignment} | ||
@@ -228,3 +234,3 @@ var _o1, _o2; | ||
protocol.define('SyncConsumerGroupRequest', { | ||
Protocol.define('SyncConsumerGroupRequest', { | ||
write: function (data) { // { groupId generationId memberId groupAssignment } | ||
@@ -245,3 +251,4 @@ this | ||
protocol.define('SyncGroupResponse', { | ||
/* istanbul ignore next */ | ||
Protocol.define('SyncGroupResponse', { | ||
read: function () { | ||
@@ -256,3 +263,3 @@ this | ||
// consumer protocol | ||
protocol.define('SyncConsumerGroupResponse', { | ||
Protocol.define('SyncConsumerGroupResponse', { | ||
read: function () { | ||
@@ -266,3 +273,3 @@ this | ||
protocol.define('HeartbeatRequest', { | ||
Protocol.define('HeartbeatRequest', { | ||
write: function (data) { // { groupId generationId memberId } | ||
@@ -282,3 +289,3 @@ this | ||
protocol.define('HeartbeatResponse', { | ||
Protocol.define('HeartbeatResponse', { | ||
read: function () { | ||
@@ -291,3 +298,3 @@ this | ||
protocol.define('LeaveGroupRequest', { | ||
Protocol.define('LeaveGroupRequest', { | ||
write: function (data) { // { groupId memberId } | ||
@@ -306,3 +313,3 @@ this | ||
protocol.define('LeaveGroupResponse', { | ||
Protocol.define('LeaveGroupResponse', { | ||
read: function () { | ||
@@ -309,0 +316,0 @@ this |
'use strict'; | ||
var protocol = require('bin-protocol'); | ||
var Protocol = require('bin-protocol'); | ||
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol | ||
module.exports = protocol; | ||
var KafkaProtocol = Protocol.createProtocol(); | ||
module.exports = KafkaProtocol; | ||
[ | ||
@@ -10,0 +12,0 @@ 'common', |
'use strict'; | ||
var protocol = require('bin-protocol'); | ||
var Protocol = require('./index'); | ||
var globals = require('./globals'); | ||
@@ -13,3 +13,3 @@ | ||
protocol.define('MetadataRequest', { | ||
Protocol.define('MetadataRequest', { | ||
write: function (data) { // data: { correlationId, clientId, [topicNames] } | ||
@@ -27,3 +27,3 @@ this | ||
protocol.define('Broker', { | ||
Protocol.define('Broker', { | ||
read: function () { | ||
@@ -37,3 +37,3 @@ this | ||
protocol.define('PartitionMetadata', { | ||
Protocol.define('PartitionMetadata', { | ||
read: function () { | ||
@@ -49,3 +49,3 @@ this | ||
protocol.define('TopicMetadata', { | ||
Protocol.define('TopicMetadata', { | ||
read: function () { | ||
@@ -59,3 +59,3 @@ this | ||
protocol.define('MetadataResponse', { | ||
Protocol.define('MetadataResponse', { | ||
read: function () { | ||
@@ -62,0 +62,0 @@ this |
'use strict'; | ||
var protocol = require('bin-protocol'); | ||
var Protocol = require('./index'); | ||
var globals = require('./globals'); | ||
@@ -15,3 +15,3 @@ | ||
// v0 | ||
protocol.define('OffsetCommitRequestV0PartitionItem', { | ||
Protocol.define('OffsetCommitRequestV0PartitionItem', { | ||
write: function (data) { // { partition, offset, metadata } | ||
@@ -25,3 +25,3 @@ this | ||
protocol.define('OffsetCommitRequestV0TopicItem', { | ||
Protocol.define('OffsetCommitRequestV0TopicItem', { | ||
write: function (data) { // { topicName, partitions } | ||
@@ -34,3 +34,3 @@ this | ||
protocol.define('OffsetCommitRequestV0', { | ||
Protocol.define('OffsetCommitRequestV0', { | ||
write: function (data) { // { groupId, topics } | ||
@@ -50,3 +50,4 @@ this | ||
// v1 | ||
protocol.define('OffsetCommitRequestV1PartitionItem', { | ||
/* istanbul ignore next */ | ||
Protocol.define('OffsetCommitRequestV1PartitionItem', { | ||
write: function (data) { // { partition, offset, timestamp, metadata } | ||
@@ -61,3 +62,4 @@ this | ||
protocol.define('OffsetCommitRequestV1TopicItem', { | ||
/* istanbul ignore next */ | ||
Protocol.define('OffsetCommitRequestV1TopicItem', { | ||
write: function (data) { // { topicName, partitions } | ||
@@ -70,3 +72,4 @@ this | ||
protocol.define('OffsetCommitRequestV1', { | ||
/* istanbul ignore next */ | ||
Protocol.define('OffsetCommitRequestV1', { | ||
write: function (data) { // { groupId, generationId, memberId, topics } | ||
@@ -88,3 +91,3 @@ this | ||
// v2 | ||
protocol.define('OffsetCommitRequestV2', { | ||
Protocol.define('OffsetCommitRequestV2', { | ||
write: function (data) { // { groupId, generationId, memberId, retentionTime, topics } | ||
@@ -107,3 +110,3 @@ this | ||
protocol.define('OffsetCommitResponseTopicItem', { | ||
Protocol.define('OffsetCommitResponseTopicItem', { | ||
read: function () { | ||
@@ -116,3 +119,3 @@ this | ||
protocol.define('OffsetCommitResponsePartitionItem', { | ||
Protocol.define('OffsetCommitResponsePartitionItem', { | ||
read: function () { | ||
@@ -126,3 +129,3 @@ this | ||
// v0, v1 and v2 | ||
protocol.define('OffsetCommitResponse', { | ||
Protocol.define('OffsetCommitResponse', { | ||
read: function () { | ||
@@ -136,3 +139,3 @@ this | ||
protocol.define('OffsetFetchRequestTopicItem', { | ||
Protocol.define('OffsetFetchRequestTopicItem', { | ||
write: function (data) { // { topicName, partitions } | ||
@@ -145,3 +148,3 @@ this | ||
protocol.define('OffsetFetchRequest', { | ||
Protocol.define('OffsetFetchRequest', { | ||
write: function (data) { // { apiVersion, groupId, topics } | ||
@@ -160,3 +163,3 @@ this | ||
protocol.define('OffsetFetchResponseTopicItem', { | ||
Protocol.define('OffsetFetchResponseTopicItem', { | ||
read: function () { | ||
@@ -169,3 +172,3 @@ this | ||
protocol.define('OffsetFetchResponsePartitionItem', { | ||
Protocol.define('OffsetFetchResponsePartitionItem', { | ||
read: function () { | ||
@@ -180,3 +183,3 @@ this | ||
protocol.define('OffsetFetchResponse', { | ||
Protocol.define('OffsetFetchResponse', { | ||
read: function () { | ||
@@ -183,0 +186,0 @@ this |
'use strict'; | ||
var protocol = require('bin-protocol'); | ||
var Protocol = require('./index'); | ||
var globals = require('./globals'); | ||
@@ -12,3 +12,3 @@ | ||
protocol.define('OffsetRequestPartitionItem', { | ||
Protocol.define('OffsetRequestPartitionItem', { | ||
write: function (data) { // { partition, time, maxNumberOfOffsets } | ||
@@ -22,3 +22,3 @@ this | ||
protocol.define('OffsetRequestTopicItem', { | ||
Protocol.define('OffsetRequestTopicItem', { | ||
write: function (data) { // { topicName, partitions } | ||
@@ -31,3 +31,3 @@ this | ||
protocol.define('OffsetRequest', { | ||
Protocol.define('OffsetRequest', { | ||
write: function (data) { // { topics } | ||
@@ -46,3 +46,3 @@ this | ||
protocol.define('OffsetResponseTopicItem', { | ||
Protocol.define('OffsetResponseTopicItem', { | ||
read: function () { | ||
@@ -55,3 +55,3 @@ this | ||
protocol.define('OffsetResponsePartitionItem', { | ||
Protocol.define('OffsetResponsePartitionItem', { | ||
read: function () { | ||
@@ -65,3 +65,3 @@ this | ||
protocol.define('OffsetResponse', { | ||
Protocol.define('OffsetResponse', { | ||
read: function () { | ||
@@ -68,0 +68,0 @@ this |
'use strict'; | ||
var protocol = require('bin-protocol'); | ||
var Protocol = require('./index'); | ||
var globals = require('./globals'); | ||
@@ -12,3 +12,3 @@ | ||
protocol.define('ProduceRequestPartitionItem', { | ||
Protocol.define('ProduceRequestPartitionItem', { | ||
write: function (data) { // {partition, messageSet} | ||
@@ -28,3 +28,3 @@ var _o1, _o2; | ||
protocol.define('ProduceRequestTopicItem', { | ||
Protocol.define('ProduceRequestTopicItem', { | ||
write: function (data) { // {topicName, partitions} | ||
@@ -37,3 +37,3 @@ this | ||
protocol.define('ProduceRequest', { | ||
Protocol.define('ProduceRequest', { | ||
write: function (data) { // { requiredAcks, timeout, topics } | ||
@@ -53,3 +53,3 @@ this | ||
protocol.define('ProduceResponseTopicItem', { | ||
Protocol.define('ProduceResponseTopicItem', { | ||
read: function () { | ||
@@ -62,3 +62,3 @@ this | ||
protocol.define('ProduceResponsePartitionItem', { | ||
Protocol.define('ProduceResponsePartitionItem', { | ||
read: function () { | ||
@@ -72,3 +72,3 @@ this | ||
protocol.define('ProduceResponse', { | ||
Protocol.define('ProduceResponse', { | ||
read: function () { | ||
@@ -75,0 +75,0 @@ this |
@@ -10,3 +10,3 @@ 'use strict'; | ||
function SimpleConsumer(options) { | ||
this.options = _.partialRight(_.merge, _.defaults)(options || {}, { | ||
this.options = _.defaultsDeep(options || {}, { | ||
groupId: 'no-kafka-group-v0' | ||
@@ -13,0 +13,0 @@ }); |
@@ -9,3 +9,3 @@ { | ||
}, | ||
"version": "1.0.2", | ||
"version": "1.1.0", | ||
"main": "./lib/index.js", | ||
@@ -12,0 +12,0 @@ "keywords": ["kafka"], |
@@ -207,3 +207,46 @@ [![Build Status](https://travis-ci.org/oleksiyk/kafka.png)](https://travis-ci.org/oleksiyk/kafka) | ||
## GroupAdmin (consumer groups API) | ||
Offes two methods: | ||
* `listGroups` - list existing consumer groups | ||
* `describeGroup` - describe existing group by its id | ||
Example: | ||
```javascript | ||
var admin = new Kafka.GroupAdmin(); | ||
return admin.init().then(function(){ | ||
return admin.listGroups().then(function(groups){ | ||
// [ { groupId: 'no-kafka-admin-test-group', protocolType: 'consumer' } ] | ||
return admin.describeGroup('no-kafka-admin-test-group').then(function(group){ | ||
/* | ||
{ error: null, | ||
groupId: 'no-kafka-admin-test-group', | ||
state: 'Stable', | ||
protocolType: 'consumer', | ||
protocol: 'TestStrategy', | ||
members: | ||
[ { memberId: 'group-consumer-82646843-b4b8-4e91-94c9-b4708c8b05e8', | ||
clientId: 'group-consumer', | ||
clientHost: '/192.168.1.4', | ||
version: 0, | ||
subscriptions: [ 'kafka-test-topic'], | ||
metadata: <Buffer 63 6f 6e 73 75 6d 65 72 2d 6d 65 74 61 64 61 74 61>, | ||
memberAssignment: | ||
{ _blength: 44, | ||
version: 0, | ||
partitionAssignment: | ||
[ { topic: 'kafka-test-topic', | ||
partitions: [ 0, 1, 2 ] }, | ||
], | ||
metadata: null } }, | ||
] } | ||
*/ | ||
}) | ||
}); | ||
}); | ||
``` | ||
## Logging | ||
@@ -210,0 +253,0 @@ |
@@ -16,2 +16,4 @@ 'use strict'; | ||
var maxBytesTestMessagesSize; | ||
describe('SimpleConsumer', function () { | ||
@@ -69,2 +71,24 @@ before(function () { | ||
it('should correctly encode/decode utf8 string message value', function () { | ||
dataListenerSpy.reset(); | ||
return producer.send({ | ||
topic: 'kafka-test-topic', | ||
partition: 0, | ||
message: { value: '人人生而自由,在尊嚴和權利上一律平等。' } | ||
}) | ||
.delay(100) | ||
.then(function () { | ||
/* jshint expr: true */ | ||
dataListenerSpy.should.have.been.called; // eslint-disable-line | ||
dataListenerSpy.lastCall.args[0].should.be.an('array').and.have.length(1); | ||
dataListenerSpy.lastCall.args[1].should.be.a('string', 'kafka-test-topic'); | ||
dataListenerSpy.lastCall.args[2].should.be.a('number', 0); | ||
dataListenerSpy.lastCall.args[0][0].should.be.an('object'); | ||
dataListenerSpy.lastCall.args[0][0].should.have.property('message').that.is.an('object'); | ||
dataListenerSpy.lastCall.args[0][0].message.should.have.property('value'); | ||
dataListenerSpy.lastCall.args[0][0].message.value.toString('utf8').should.be.eql('人人生而自由,在尊嚴和權利上一律平等。'); | ||
}); | ||
}); | ||
it('offset() should return last offset', function () { | ||
@@ -100,2 +124,4 @@ return consumer.offset('kafka-test-topic', 0).then(function (offset) { | ||
dataListenerSpy.lastCall.args[0][1].message.value.toString('utf8').should.be.eql('p001'); | ||
// save for next test | ||
maxBytesTestMessagesSize = dataListenerSpy.lastCall.args[0][0].messageSize + dataListenerSpy.lastCall.args[0][1].messageSize; | ||
}); | ||
@@ -110,3 +136,5 @@ }); | ||
return consumer.offset('kafka-test-topic', 0).then(function (offset) { | ||
return consumer.subscribe('kafka-test-topic', 0, { offset: offset - 2, maxBytes: 30 }) | ||
// ask for maxBytes that is only 1 byte less then required for both last messages | ||
var maxBytes = 2 * (8 + 4) + maxBytesTestMessagesSize - 1; | ||
return consumer.subscribe('kafka-test-topic', 0, { offset: offset - 2, maxBytes: maxBytes }) | ||
.delay(200) | ||
@@ -143,13 +171,23 @@ .then(function () { | ||
metadata: 'm2' | ||
}, | ||
{ | ||
topic: 'kafka-test-topic', | ||
partition: 2, | ||
offset: 3, | ||
metadata: 'm3' | ||
} | ||
]).then(function (result) { | ||
result.should.be.an('array').that.has.length(2); | ||
result.should.be.an('array').that.has.length(3); | ||
result[0].should.be.an('object'); | ||
result[1].should.be.an('object'); | ||
result[2].should.be.an('object'); | ||
result[0].should.have.property('topic', 'kafka-test-topic'); | ||
result[1].should.have.property('topic', 'kafka-test-topic'); | ||
result[2].should.have.property('topic', 'kafka-test-topic'); | ||
result[0].should.have.property('partition').that.is.a('number'); | ||
result[1].should.have.property('partition').that.is.a('number'); | ||
result[2].should.have.property('partition').that.is.a('number'); | ||
result[0].should.have.property('error', null); | ||
result[1].should.have.property('error', null); | ||
result[2].should.have.property('error', null); | ||
}); | ||
@@ -168,19 +206,29 @@ }); | ||
partition: 1 | ||
}, | ||
{ | ||
topic: 'kafka-test-topic', | ||
partition: 2 | ||
} | ||
]).then(function (result) { | ||
result.should.be.an('array').that.has.length(2); | ||
result.should.be.an('array').that.has.length(3); | ||
result[0].should.be.an('object'); | ||
result[1].should.be.an('object'); | ||
result[2].should.be.an('object'); | ||
result[0].should.have.property('topic', 'kafka-test-topic'); | ||
result[1].should.have.property('topic', 'kafka-test-topic'); | ||
result[2].should.have.property('topic', 'kafka-test-topic'); | ||
result[0].should.have.property('partition').that.is.a('number'); | ||
result[1].should.have.property('partition').that.is.a('number'); | ||
result[2].should.have.property('partition').that.is.a('number'); | ||
result[0].should.have.property('offset').that.is.a('number'); | ||
result[1].should.have.property('offset').that.is.a('number'); | ||
result[2].should.have.property('offset').that.is.a('number'); | ||
result[0].should.have.property('error', null); | ||
result[1].should.have.property('error', null); | ||
result[2].should.have.property('error', null); | ||
_.find(result, { topic: 'kafka-test-topic', partition: 0 }).offset.should.be.eql(1); | ||
_.find(result, { topic: 'kafka-test-topic', partition: 1 }).offset.should.be.eql(2); | ||
_.find(result, { topic: 'kafka-test-topic', partition: 2 }).offset.should.be.eql(3); | ||
}); | ||
}); | ||
}); |
@@ -226,33 +226,2 @@ 'use strict'; | ||
it('should list groups', function () { | ||
return consumers[0].listGroups().then(function (groups) { | ||
groups.should.be.an('array').and.have.length(1); | ||
groups[0].should.be.an('object'); | ||
groups[0].should.have.property('groupId', consumers[0].options.groupId); | ||
groups[0].should.have.property('protocolType', 'consumer'); | ||
}); | ||
}); | ||
it('should describe group', function () { | ||
return consumers[0].describeGroup().then(function (group) { | ||
group.should.be.an('object'); | ||
group.should.have.property('error', null); | ||
group.should.have.property('groupId', consumers[0].options.groupId); | ||
group.should.have.property('state'); | ||
group.should.have.property('protocolType', 'consumer'); | ||
group.should.have.property('protocol', 'TestStrategy'); | ||
group.should.have.property('members').that.is.an('array'); | ||
group.members.should.have.length(3); | ||
group.members[0].should.be.an('object'); | ||
group.members[0].should.have.property('memberId').that.is.a('string'); | ||
group.members[0].should.have.property('clientId').that.is.a('string'); | ||
group.members[0].should.have.property('clientHost').that.is.a('string'); | ||
group.members[0].should.have.property('version').that.is.a('number'); | ||
group.members[0].should.have.property('subscriptions').that.is.an('array'); | ||
group.members[0].should.have.property('metadata'); | ||
group.members[0].should.have.property('memberAssignment').that.is.a('object'); | ||
group.members[0].memberAssignment.should.have.property('partitionAssignment').that.is.an('array'); | ||
}); | ||
}); | ||
it('should not log errors on clean shutdown', function () { | ||
@@ -259,0 +228,0 @@ var spy = sinon.spy(function () {}); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
170695
39
3191
302