Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

no-kafka

Package Overview
Dependencies
Maintainers
1
Versions
98
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

no-kafka - npm Package Compare versions

Comparing version 1.0.2 to 1.1.0

lib/group_admin.js

2

lib/base_consumer.js

@@ -12,3 +12,3 @@ 'use strict';

function BaseConsumer(options) {
this.options = _.partialRight(_.merge, _.defaults)(options || {}, {
this.options = _.defaultsDeep(options || {}, {
timeout: 100, // client timeout for produce and fetch requests

@@ -15,0 +15,0 @@ idleTimeout: 1000, // timeout between fetch requests

@@ -5,3 +5,3 @@ 'use strict';

var Connection = require('./connection');
var protocol = require('./protocol');
var Protocol = require('./protocol');
var errors = require('./errors');

@@ -14,3 +14,3 @@ var _ = require('lodash');

self.options = _.partialRight(_.merge, _.defaults)(options || {}, {
self.options = _.defaultsDeep(options || {}, {
clientId: 'no-kafka-client',

@@ -28,3 +28,5 @@ connectionString: '127.0.0.1:9092',

self.encoder = new protocol.Writer(256 * 1024);
self.protocol = new Protocol({
writerBufSize: 256 * 1024
});

@@ -46,2 +48,3 @@ // client metadata

return _(topics).flatten().transform(function (a, tv) {
if (tv === undefined) { return false; } // requiredAcks=0
_.each(tv.partitions, function (p) {

@@ -133,11 +136,11 @@ a.push(_.merge({

buffer = self.encoder.reset().MetadataRequest({
buffer = self.protocol.write().MetadataRequest({
correlationId: self.correlationId++,
clientId: self.options.clientId,
topicNames: topicNames || []
}).result();
}).result;
return Promise.any(self.initialBrokers.map(function (connection) {
return connection.send(buffer).then(function (responseBuffer) {
return protocol.read(responseBuffer).MetadataResponse().result;
return self.protocol.read(responseBuffer).MetadataResponse().result;
});

@@ -200,3 +203,3 @@ }))

return Promise.all(_.map(requests, function (topics, leader) {
var buffer = self.encoder.reset().ProduceRequest({
var buffer = self.protocol.write().ProduceRequest({
correlationId: self.correlationId++,

@@ -207,7 +210,7 @@ clientId: self.options.clientId,

topics: topics
}).result();
}).result;
return self.brokerConnections[leader].send(buffer, self.options.requiredAcks === 0).then(function (responseBuffer) {
if (self.options.requiredAcks !== 0) {
return protocol.read(responseBuffer).ProduceResponse().result.topics;
return self.protocol.read(responseBuffer).ProduceResponse().result.topics;
}

@@ -235,3 +238,3 @@ })

buffer = self.encoder.reset().FetchRequest({
buffer = self.protocol.write().FetchRequest({
correlationId: self.correlationId++,

@@ -242,6 +245,6 @@ clientId: self.options.clientId,

topics: topics
}).buffer;
}).result;
return self.brokerConnections[leader].send(buffer).then(function (responseBuffer) {
return protocol.read(responseBuffer).FetchResponse().result.topics;
return self.protocol.read(responseBuffer).FetchResponse().result.topics;
})

@@ -261,10 +264,10 @@ .catch(errors.NoKafkaConnectionError, function (err) {

return Promise.all(_.map(requests, function (topics, leader) {
var buffer = self.encoder.reset().OffsetRequest({
var buffer = self.protocol.write().OffsetRequest({
correlationId: self.correlationId++,
clientId: self.options.clientId,
topics: topics
}).buffer;
}).result;
return self.brokerConnections[leader].send(buffer).then(function (responseBuffer) {
return protocol.read(responseBuffer).OffsetResponse().result.topics;
return self.protocol.read(responseBuffer).OffsetResponse().result.topics;
});

@@ -281,3 +284,3 @@ }))

return Promise.all(_.map(requests, function (topics, leader) {
var buffer = self.encoder.reset().OffsetCommitRequestV0({
var buffer = self.protocol.write().OffsetCommitRequestV0({
correlationId: self.correlationId++,

@@ -287,6 +290,6 @@ clientId: self.options.clientId,

topics: topics
}).buffer;
}).result;
return self.brokerConnections[leader].send(buffer).then(function (responseBuffer) {
return protocol.read(responseBuffer).OffsetCommitResponse().result.topics;
return self.protocol.read(responseBuffer).OffsetCommitResponse().result.topics;
});

@@ -304,3 +307,3 @@ }))

return Promise.all(_.map(requests, function (topics, leader) {
var buffer = self.encoder.reset().OffsetFetchRequest({
var buffer = self.protocol.write().OffsetFetchRequest({
correlationId: self.correlationId++,

@@ -311,6 +314,6 @@ clientId: self.options.clientId,

topics: topics
}).buffer;
}).result;
return self.brokerConnections[leader].send(buffer).then(function (responseBuffer) {
return protocol.read(responseBuffer).OffsetFetchResponse().result.topics;
return self.protocol.read(responseBuffer).OffsetFetchResponse().result.topics;
});

@@ -327,3 +330,3 @@ }))

return self._findGroupCoordinator(groupId).then(function (connection) {
var buffer = self.encoder.reset().OffsetFetchRequest({
var buffer = self.protocol.write().OffsetFetchRequest({
correlationId: self.correlationId++,

@@ -334,6 +337,6 @@ clientId: self.options.clientId,

topics: requests
}).buffer;
}).result;
return connection.send(buffer).then(function (responseBuffer) {
return protocol.read(responseBuffer).OffsetFetchResponse().result.topics;
return self.protocol.read(responseBuffer).OffsetFetchResponse().result.topics;
});

@@ -365,11 +368,11 @@ })

buffer = self.encoder.reset().GroupCoordinatorRequest({
buffer = self.protocol.write().GroupCoordinatorRequest({
correlationId: self.correlationId++,
clientId: self.options.clientId,
groupId: groupId
}).buffer;
}).result;
self.groupCoordinators[groupId] = Promise.any(self.initialBrokers.map(function (connection) {
return connection.send(buffer).then(function (responseBuffer) {
var result = protocol.read(responseBuffer).GroupCoordinatorResponse().result;
var result = self.protocol.read(responseBuffer).GroupCoordinatorResponse().result;
if (result.error) {

@@ -402,3 +405,3 @@ throw result.error;

return self._findGroupCoordinator(groupId).then(function (connection) {
var buffer = self.encoder.reset().JoinConsumerGroupRequest({
var buffer = self.protocol.write().JoinConsumerGroupRequest({
correlationId: self.correlationId++,

@@ -410,6 +413,6 @@ clientId: self.options.clientId,

groupProtocols: strategies
}).buffer;
}).result;
return connection.send(buffer).then(function (responseBuffer) {
var result = protocol.read(responseBuffer).JoinConsumerGroupResponse().result;
var result = self.protocol.read(responseBuffer).JoinConsumerGroupResponse().result;
if (result.error) {

@@ -427,3 +430,3 @@ throw result.error;

return self._findGroupCoordinator(groupId).then(function (connection) {
var buffer = self.encoder.reset().HeartbeatRequest({
var buffer = self.protocol.write().HeartbeatRequest({
correlationId: self.correlationId++,

@@ -434,6 +437,6 @@ clientId: self.options.clientId,

generationId: generationId
}).buffer;
}).result;
return connection.send(buffer).then(function (responseBuffer) {
var result = protocol.read(responseBuffer).HeartbeatResponse().result;
var result = self.protocol.read(responseBuffer).HeartbeatResponse().result;
if (result.error) {

@@ -451,3 +454,3 @@ throw result.error;

return self._findGroupCoordinator(groupId).then(function (connection) {
var buffer = self.encoder.reset().SyncConsumerGroupRequest({
var buffer = self.protocol.write().SyncConsumerGroupRequest({
correlationId: self.correlationId++,

@@ -459,6 +462,6 @@ clientId: self.options.clientId,

groupAssignment: groupAssignment
}).buffer;
}).result;
return connection.send(buffer).then(function (responseBuffer) {
var result = protocol.read(responseBuffer).SyncConsumerGroupResponse().result;
var result = self.protocol.read(responseBuffer).SyncConsumerGroupResponse().result;
if (result.error) {

@@ -476,3 +479,3 @@ throw result.error;

return self._findGroupCoordinator(groupId).then(function (connection) {
var buffer = self.encoder.reset().LeaveGroupRequest({
var buffer = self.protocol.write().LeaveGroupRequest({
correlationId: self.correlationId++,

@@ -482,6 +485,6 @@ clientId: self.options.clientId,

memberId: memberId
}).buffer;
}).result;
return connection.send(buffer).then(function (responseBuffer) {
var result = protocol.read(responseBuffer).LeaveGroupResponse().result;
var result = self.protocol.read(responseBuffer).LeaveGroupResponse().result;
if (result.error) {

@@ -500,3 +503,3 @@ throw result.error;

return self._findGroupCoordinator(groupId).then(function (connection) {
var buffer = self.encoder.reset().OffsetCommitRequestV2({
var buffer = self.protocol.write().OffsetCommitRequestV2({
correlationId: self.correlationId++,

@@ -509,6 +512,6 @@ clientId: self.options.clientId,

topics: requests
}).buffer;
}).result;
return connection.send(buffer).then(function (responseBuffer) {
return protocol.read(responseBuffer).OffsetCommitResponse().result.topics;
return self.protocol.read(responseBuffer).OffsetCommitResponse().result.topics;
});

@@ -523,6 +526,6 @@ })

var buffer = self.encoder.reset().ListGroupsRequest({
var buffer = self.protocol.write().ListGroupsRequest({
correlationId: self.correlationId++,
clientId: self.options.clientId
}).buffer;
}).result;

@@ -532,3 +535,3 @@ return self._metadata().then(function () {

return connection.send(buffer).then(function (responseBuffer) {
return protocol.read(responseBuffer).ListGroupResponse().result.groups;
return self.protocol.read(responseBuffer).ListGroupResponse().result.groups;
});

@@ -544,10 +547,10 @@ });

return self._findGroupCoordinator(groupId).then(function (connection) {
var buffer = self.encoder.reset().DescribeGroupRequest({
var buffer = self.protocol.write().DescribeGroupRequest({
correlationId: self.correlationId++,
clientId: self.options.clientId,
groups: [groupId]
}).buffer;
}).result;
return connection.send(buffer).then(function (responseBuffer) {
return protocol.read(responseBuffer).DescribeGroupResponse().result.groups[0];
return self.protocol.read(responseBuffer).DescribeGroupResponse().result.groups[0];
});

@@ -554,0 +557,0 @@ });

@@ -10,3 +10,3 @@ 'use strict';

function GroupConsumer(options) {
this.options = _.partialRight(_.merge, _.defaults)(options || {}, {
this.options = _.defaultsDeep(options || {}, {
groupId: 'no-kafka-group-v0.9',

@@ -322,19 +322,1 @@ sessionTimeout: 15000, // min 6000, max 30000

/**
* List all consumer groups
*
* @return {Promise}
*/
GroupConsumer.prototype.listGroups = function () {
return this.client.listGroupsRequest();
};
/**
* Describe consumer group
*
* @param {String} groupId
* @return {Promise}
*/
GroupConsumer.prototype.describeGroup = function (groupId) {
return this.client.describeGroupRequest(groupId || this.options.groupId);
};

@@ -63,2 +63,3 @@ 'use strict';

exports.GroupConsumer = require('./group_consumer');
exports.GroupAdmin = require('./group_admin');

@@ -65,0 +66,0 @@ // offset request time constants

@@ -9,3 +9,3 @@ 'use strict';

function Producer(options) {
this.options = _.partialRight(_.merge, _.defaults)(options || {}, {
this.options = _.defaultsDeep(options || {}, {
requiredAcks: 1,

@@ -12,0 +12,0 @@ timeout: 100

'use strict';
var protocol = require('bin-protocol');
var Protocol = require('./index');
var globals = require('./globals');
protocol.define('ListGroupsRequest', {
Protocol.define('ListGroupsRequest', {
write: function (data) {

@@ -18,3 +18,3 @@ this

protocol.define('ListGroupResponse_GroupItem', {
Protocol.define('ListGroupResponse_GroupItem', {
read: function () {

@@ -27,3 +27,3 @@ this

protocol.define('ListGroupResponse', {
Protocol.define('ListGroupResponse', {
read: function () {

@@ -37,3 +37,3 @@ this

protocol.define('DescribeGroupRequest', {
Protocol.define('DescribeGroupRequest', {
write: function (data) {

@@ -51,3 +51,3 @@ this

protocol.define('DescribeGroupResponse_ConsumerGroupMemberItem', {
Protocol.define('DescribeGroupResponse_ConsumerGroupMemberItem', {
read: function () {

@@ -66,3 +66,4 @@ this

protocol.define('DescribeGroupResponse_GroupMemberItem', {
/* istanbul ignore next */
Protocol.define('DescribeGroupResponse_GroupMemberItem', {
read: function () {

@@ -78,3 +79,3 @@ this

protocol.define('DescribeGroupResponse_GroupItem', {
Protocol.define('DescribeGroupResponse_GroupItem', {
read: function () {

@@ -90,2 +91,3 @@ this

} else {
/* istanbul ignore next */
this.array('members', this.DescribeGroupResponse_GroupMemberItem);

@@ -96,3 +98,3 @@ }

protocol.define('DescribeGroupResponse', {
Protocol.define('DescribeGroupResponse', {
read: function () {

@@ -99,0 +101,0 @@ this

'use strict';
var protocol = require('bin-protocol');
var Protocol = require('./index');
var errors = require('../errors');

@@ -17,3 +17,3 @@ var crc32 = require('buffer-crc32');

// variable length primitives, bytes
protocol.define('bytes', {
Protocol.define('bytes', {
read: function () {

@@ -31,9 +31,8 @@ this.Int32BE('length');

} else {
if (Buffer.isBuffer(value) || typeof value === 'string') {
this
.Int32BE(value.length)
.raw(value);
} else {
throw new Error('Kafka bytes value should be a Buffer or String');
if (!Buffer.isBuffer(value)) {
value = new Buffer(_(value).toString(), 'utf8');
}
this
.Int32BE(value.length)
.raw(value);
}

@@ -44,3 +43,3 @@ }

// variable length primitives, string
protocol.define('string', {
Protocol.define('string', {
read: function () {

@@ -58,10 +57,6 @@ this.Int16BE('length');

} else {
if (typeof value === 'string') {
value = new Buffer(value, 'utf8');
this
.Int16BE(value.length)
.raw(value);
} else {
throw new Error('Kafka string value should be a String');
}
value = new Buffer(_(value).toString(), 'utf8');
this
.Int16BE(value.length)
.raw(value);
}

@@ -72,3 +67,3 @@ }

// array
protocol.define('array', {
Protocol.define('array', {
read: function (fn) {

@@ -94,3 +89,3 @@ this.Int32BE('length');

// return Error instance
protocol.define('ErrorCode', {
Protocol.define('ErrorCode', {
read: function () {

@@ -103,3 +98,3 @@ this.Int16BE('error');

// 64 bit offset, convert from Long (https://github.com/dcodeIO/long.js) to double, 53 bit
protocol.define('KafkaOffset', {
Protocol.define('KafkaOffset', {
read: function () {

@@ -114,3 +109,3 @@ this.Int64BE('offset');

protocol.define('RequestHeader', {
Protocol.define('RequestHeader', {
write: function (header) {

@@ -129,3 +124,3 @@ this

protocol.define('MessageAttributes', {
Protocol.define('MessageAttributes', {
read: function () {

@@ -140,3 +135,3 @@ this.Int8('_raw');

protocol.define('Message', {
Protocol.define('Message', {
read: function () {

@@ -168,3 +163,3 @@ // var _o1 = this.offset;

protocol.define('MessageSetItem', {
Protocol.define('MessageSetItem', {
read: function (size, end) {

@@ -189,3 +184,3 @@ if (size < 8 + 4) {

var _o1, _o2;
this.Int64BE(value.offset);
this.KafkaOffset(value.offset);
_o1 = this.offset;

@@ -202,3 +197,3 @@ this

protocol.define('MessageSet', {
Protocol.define('MessageSet', {
read: function (size) {

@@ -213,3 +208,3 @@ var _o1 = this.offset;

return _.dropRightWhile(this.context.items, { _partial: true }); // drop partailly read messages
return _.dropRightWhile(this.context.items, { _partial: true }); // drop partially read messages
},

@@ -216,0 +211,0 @@ write: function (items) {

'use strict';
var protocol = require('bin-protocol');
var Protocol = require('./index');
var globals = require('./globals');

@@ -12,7 +12,7 @@

protocol.define('FetchRequestPartitionItem', {
Protocol.define('FetchRequestPartitionItem', {
write: function (data) { // {partition, offset, maxBytes}
this
.Int32BE(data.partition)
.Int64BE(data.offset)
.KafkaOffset(data.offset)
.Int32BE(data.maxBytes);

@@ -22,3 +22,3 @@ }

protocol.define('FetchRequestTopicItem', {
Protocol.define('FetchRequestTopicItem', {
write: function (data) { // {topicName, partitions}

@@ -31,3 +31,3 @@ this

protocol.define('FetchRequest', {
Protocol.define('FetchRequest', {
write: function (data) { // { timeout, minBytes, topics }

@@ -48,3 +48,3 @@ this

protocol.define('FetchResponseTopicItem', {
Protocol.define('FetchResponseTopicItem', {
read: function () {

@@ -57,3 +57,3 @@ this

protocol.define('FetchResponsePartitionItem', {
Protocol.define('FetchResponsePartitionItem', {
read: function () {

@@ -69,3 +69,3 @@ this

protocol.define('FetchResponse', {
Protocol.define('FetchResponse', {
read: function () {

@@ -72,0 +72,0 @@ this

'use strict';
var protocol = require('bin-protocol');
var Protocol = require('./index');
var globals = require('./globals');

@@ -13,3 +13,3 @@

protocol.define('GroupCoordinatorRequest', {
Protocol.define('GroupCoordinatorRequest', {
write: function (data) { // { groupId }

@@ -27,3 +27,3 @@ this

protocol.define('GroupCoordinatorResponse', {
Protocol.define('GroupCoordinatorResponse', {
read: function () {

@@ -39,3 +39,4 @@ this

protocol.define('JoinGroupRequest_GroupProtocolItem', {
/* istanbul ignore next */
Protocol.define('JoinGroupRequest_GroupProtocolItem', {
write: function (data) { // { name, metadata }

@@ -48,3 +49,4 @@ this

protocol.define('JoinGroupRequest', {
/* istanbul ignore next */
Protocol.define('JoinGroupRequest', {
write: function (data) { // { groupId sessionTimeout memberId protocolType groupProtocols }

@@ -67,3 +69,3 @@ this

// consumer protocol
protocol.define('JoinConsumerGroupRequest_GroupProtocolItem', {
Protocol.define('JoinConsumerGroupRequest_GroupProtocolItem', {
write: function (data) { // { strategy, version, subscriptions, metadata }

@@ -86,3 +88,3 @@ var _o1, _o2;

protocol.define('JoinConsumerGroupRequest', {
Protocol.define('JoinConsumerGroupRequest', {
write: function (data) { // { groupId sessionTimeout memberId groupProtocols }

@@ -104,3 +106,4 @@ this

protocol.define('JoinGroupResponse_Member', {
/* istanbul ignore next */
Protocol.define('JoinGroupResponse_Member', {
read: function () {

@@ -113,3 +116,4 @@ this

protocol.define('JoinGroupResponse', {
/* istanbul ignore next */
Protocol.define('JoinGroupResponse', {
read: function () {

@@ -127,3 +131,3 @@ this

protocol.define('JoinConsumerGroupResponse_Member', {
Protocol.define('JoinConsumerGroupResponse_Member', {
read: function () {

@@ -139,3 +143,3 @@ this

protocol.define('JoinConsumerGroupResponse', {
Protocol.define('JoinConsumerGroupResponse', {
read: function () {

@@ -153,3 +157,4 @@ this

protocol.define('SyncGroupRequest_GroupAssignment', {
/* istanbul ignore next */
Protocol.define('SyncGroupRequest_GroupAssignment', {
write: function (data) { // { memberId memberAssignment }

@@ -162,3 +167,4 @@ this

protocol.define('SyncGroupRequest', {
/* istanbul ignore next */
Protocol.define('SyncGroupRequest', {
write: function (data) { // { groupId generationId memberId groupAssignment }

@@ -180,3 +186,3 @@ this

// consumer protocol
protocol.define('SyncConsumerGroupRequest_PartitionAssignment', {
Protocol.define('SyncConsumerGroupRequest_PartitionAssignment', {
write: function (data) { // { topic, partitions }

@@ -194,3 +200,3 @@ this

protocol.define('SyncConsumerGroupRequest_MemberAssignment', {
Protocol.define('SyncConsumerGroupRequest_MemberAssignment', {
write: function (data) { // { version partitionAssignment metadata }

@@ -214,3 +220,3 @@ this

protocol.define('SyncConsumerGroupRequest_GroupAssignment', {
Protocol.define('SyncConsumerGroupRequest_GroupAssignment', {
write: function (data) { // { memberId memberAssignment}

@@ -228,3 +234,3 @@ var _o1, _o2;

protocol.define('SyncConsumerGroupRequest', {
Protocol.define('SyncConsumerGroupRequest', {
write: function (data) { // { groupId generationId memberId groupAssignment }

@@ -245,3 +251,4 @@ this

protocol.define('SyncGroupResponse', {
/* istanbul ignore next */
Protocol.define('SyncGroupResponse', {
read: function () {

@@ -256,3 +263,3 @@ this

// consumer protocol
protocol.define('SyncConsumerGroupResponse', {
Protocol.define('SyncConsumerGroupResponse', {
read: function () {

@@ -266,3 +273,3 @@ this

protocol.define('HeartbeatRequest', {
Protocol.define('HeartbeatRequest', {
write: function (data) { // { groupId generationId memberId }

@@ -282,3 +289,3 @@ this

protocol.define('HeartbeatResponse', {
Protocol.define('HeartbeatResponse', {
read: function () {

@@ -291,3 +298,3 @@ this

protocol.define('LeaveGroupRequest', {
Protocol.define('LeaveGroupRequest', {
write: function (data) { // { groupId memberId }

@@ -306,3 +313,3 @@ this

protocol.define('LeaveGroupResponse', {
Protocol.define('LeaveGroupResponse', {
read: function () {

@@ -309,0 +316,0 @@ this

'use strict';
var protocol = require('bin-protocol');
var Protocol = require('bin-protocol');
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
module.exports = protocol;
var KafkaProtocol = Protocol.createProtocol();
module.exports = KafkaProtocol;
[

@@ -10,0 +12,0 @@ 'common',

'use strict';
var protocol = require('bin-protocol');
var Protocol = require('./index');
var globals = require('./globals');

@@ -13,3 +13,3 @@

protocol.define('MetadataRequest', {
Protocol.define('MetadataRequest', {
write: function (data) { // data: { correlationId, clientId, [topicNames] }

@@ -27,3 +27,3 @@ this

protocol.define('Broker', {
Protocol.define('Broker', {
read: function () {

@@ -37,3 +37,3 @@ this

protocol.define('PartitionMetadata', {
Protocol.define('PartitionMetadata', {
read: function () {

@@ -49,3 +49,3 @@ this

protocol.define('TopicMetadata', {
Protocol.define('TopicMetadata', {
read: function () {

@@ -59,3 +59,3 @@ this

protocol.define('MetadataResponse', {
Protocol.define('MetadataResponse', {
read: function () {

@@ -62,0 +62,0 @@ this

'use strict';
var protocol = require('bin-protocol');
var Protocol = require('./index');
var globals = require('./globals');

@@ -15,3 +15,3 @@

// v0
protocol.define('OffsetCommitRequestV0PartitionItem', {
Protocol.define('OffsetCommitRequestV0PartitionItem', {
write: function (data) { // { partition, offset, metadata }

@@ -25,3 +25,3 @@ this

protocol.define('OffsetCommitRequestV0TopicItem', {
Protocol.define('OffsetCommitRequestV0TopicItem', {
write: function (data) { // { topicName, partitions }

@@ -34,3 +34,3 @@ this

protocol.define('OffsetCommitRequestV0', {
Protocol.define('OffsetCommitRequestV0', {
write: function (data) { // { groupId, topics }

@@ -50,3 +50,4 @@ this

// v1
protocol.define('OffsetCommitRequestV1PartitionItem', {
/* istanbul ignore next */
Protocol.define('OffsetCommitRequestV1PartitionItem', {
write: function (data) { // { partition, offset, timestamp, metadata }

@@ -61,3 +62,4 @@ this

protocol.define('OffsetCommitRequestV1TopicItem', {
/* istanbul ignore next */
Protocol.define('OffsetCommitRequestV1TopicItem', {
write: function (data) { // { topicName, partitions }

@@ -70,3 +72,4 @@ this

protocol.define('OffsetCommitRequestV1', {
/* istanbul ignore next */
Protocol.define('OffsetCommitRequestV1', {
write: function (data) { // { groupId, generationId, memberId, topics }

@@ -88,3 +91,3 @@ this

// v2
protocol.define('OffsetCommitRequestV2', {
Protocol.define('OffsetCommitRequestV2', {
write: function (data) { // { groupId, generationId, memberId, retentionTime, topics }

@@ -107,3 +110,3 @@ this

protocol.define('OffsetCommitResponseTopicItem', {
Protocol.define('OffsetCommitResponseTopicItem', {
read: function () {

@@ -116,3 +119,3 @@ this

protocol.define('OffsetCommitResponsePartitionItem', {
Protocol.define('OffsetCommitResponsePartitionItem', {
read: function () {

@@ -126,3 +129,3 @@ this

// v0, v1 and v2
protocol.define('OffsetCommitResponse', {
Protocol.define('OffsetCommitResponse', {
read: function () {

@@ -136,3 +139,3 @@ this

protocol.define('OffsetFetchRequestTopicItem', {
Protocol.define('OffsetFetchRequestTopicItem', {
write: function (data) { // { topicName, partitions }

@@ -145,3 +148,3 @@ this

protocol.define('OffsetFetchRequest', {
Protocol.define('OffsetFetchRequest', {
write: function (data) { // { apiVersion, groupId, topics }

@@ -160,3 +163,3 @@ this

protocol.define('OffsetFetchResponseTopicItem', {
Protocol.define('OffsetFetchResponseTopicItem', {
read: function () {

@@ -169,3 +172,3 @@ this

protocol.define('OffsetFetchResponsePartitionItem', {
Protocol.define('OffsetFetchResponsePartitionItem', {
read: function () {

@@ -180,3 +183,3 @@ this

protocol.define('OffsetFetchResponse', {
Protocol.define('OffsetFetchResponse', {
read: function () {

@@ -183,0 +186,0 @@ this

'use strict';
var protocol = require('bin-protocol');
var Protocol = require('./index');
var globals = require('./globals');

@@ -12,3 +12,3 @@

protocol.define('OffsetRequestPartitionItem', {
Protocol.define('OffsetRequestPartitionItem', {
write: function (data) { // { partition, time, maxNumberOfOffsets }

@@ -22,3 +22,3 @@ this

protocol.define('OffsetRequestTopicItem', {
Protocol.define('OffsetRequestTopicItem', {
write: function (data) { // { topicName, partitions }

@@ -31,3 +31,3 @@ this

protocol.define('OffsetRequest', {
Protocol.define('OffsetRequest', {
write: function (data) { // { topics }

@@ -46,3 +46,3 @@ this

protocol.define('OffsetResponseTopicItem', {
Protocol.define('OffsetResponseTopicItem', {
read: function () {

@@ -55,3 +55,3 @@ this

protocol.define('OffsetResponsePartitionItem', {
Protocol.define('OffsetResponsePartitionItem', {
read: function () {

@@ -65,3 +65,3 @@ this

protocol.define('OffsetResponse', {
Protocol.define('OffsetResponse', {
read: function () {

@@ -68,0 +68,0 @@ this

'use strict';
var protocol = require('bin-protocol');
var Protocol = require('./index');
var globals = require('./globals');

@@ -12,3 +12,3 @@

protocol.define('ProduceRequestPartitionItem', {
Protocol.define('ProduceRequestPartitionItem', {
write: function (data) { // {partition, messageSet}

@@ -28,3 +28,3 @@ var _o1, _o2;

protocol.define('ProduceRequestTopicItem', {
Protocol.define('ProduceRequestTopicItem', {
write: function (data) { // {topicName, partitions}

@@ -37,3 +37,3 @@ this

protocol.define('ProduceRequest', {
Protocol.define('ProduceRequest', {
write: function (data) { // { requiredAcks, timeout, topics }

@@ -53,3 +53,3 @@ this

protocol.define('ProduceResponseTopicItem', {
Protocol.define('ProduceResponseTopicItem', {
read: function () {

@@ -62,3 +62,3 @@ this

protocol.define('ProduceResponsePartitionItem', {
Protocol.define('ProduceResponsePartitionItem', {
read: function () {

@@ -72,3 +72,3 @@ this

protocol.define('ProduceResponse', {
Protocol.define('ProduceResponse', {
read: function () {

@@ -75,0 +75,0 @@ this

@@ -10,3 +10,3 @@ 'use strict';

function SimpleConsumer(options) {
this.options = _.partialRight(_.merge, _.defaults)(options || {}, {
this.options = _.defaultsDeep(options || {}, {
groupId: 'no-kafka-group-v0'

@@ -13,0 +13,0 @@ });

@@ -9,3 +9,3 @@ {

},
"version": "1.0.2",
"version": "1.1.0",
"main": "./lib/index.js",

@@ -12,0 +12,0 @@ "keywords": ["kafka"],

@@ -207,3 +207,46 @@ [![Build Status](https://travis-ci.org/oleksiyk/kafka.png)](https://travis-ci.org/oleksiyk/kafka)

## GroupAdmin (consumer groups API)
Offes two methods:
* `listGroups` - list existing consumer groups
* `describeGroup` - describe existing group by its id
Example:
```javascript
var admin = new Kafka.GroupAdmin();
return admin.init().then(function(){
return admin.listGroups().then(function(groups){
// [ { groupId: 'no-kafka-admin-test-group', protocolType: 'consumer' } ]
return admin.describeGroup('no-kafka-admin-test-group').then(function(group){
/*
{ error: null,
groupId: 'no-kafka-admin-test-group',
state: 'Stable',
protocolType: 'consumer',
protocol: 'TestStrategy',
members:
[ { memberId: 'group-consumer-82646843-b4b8-4e91-94c9-b4708c8b05e8',
clientId: 'group-consumer',
clientHost: '/192.168.1.4',
version: 0,
subscriptions: [ 'kafka-test-topic'],
metadata: <Buffer 63 6f 6e 73 75 6d 65 72 2d 6d 65 74 61 64 61 74 61>,
memberAssignment:
{ _blength: 44,
version: 0,
partitionAssignment:
[ { topic: 'kafka-test-topic',
partitions: [ 0, 1, 2 ] },
],
metadata: null } },
] }
*/
})
});
});
```
## Logging

@@ -210,0 +253,0 @@

@@ -16,2 +16,4 @@ 'use strict';

var maxBytesTestMessagesSize;
describe('SimpleConsumer', function () {

@@ -69,2 +71,24 @@ before(function () {

it('should correctly encode/decode utf8 string message value', function () {
dataListenerSpy.reset();
return producer.send({
topic: 'kafka-test-topic',
partition: 0,
message: { value: '人人生而自由,在尊嚴和權利上一律平等。' }
})
.delay(100)
.then(function () {
/* jshint expr: true */
dataListenerSpy.should.have.been.called; // eslint-disable-line
dataListenerSpy.lastCall.args[0].should.be.an('array').and.have.length(1);
dataListenerSpy.lastCall.args[1].should.be.a('string', 'kafka-test-topic');
dataListenerSpy.lastCall.args[2].should.be.a('number', 0);
dataListenerSpy.lastCall.args[0][0].should.be.an('object');
dataListenerSpy.lastCall.args[0][0].should.have.property('message').that.is.an('object');
dataListenerSpy.lastCall.args[0][0].message.should.have.property('value');
dataListenerSpy.lastCall.args[0][0].message.value.toString('utf8').should.be.eql('人人生而自由,在尊嚴和權利上一律平等。');
});
});
it('offset() should return last offset', function () {

@@ -100,2 +124,4 @@ return consumer.offset('kafka-test-topic', 0).then(function (offset) {

dataListenerSpy.lastCall.args[0][1].message.value.toString('utf8').should.be.eql('p001');
// save for next test
maxBytesTestMessagesSize = dataListenerSpy.lastCall.args[0][0].messageSize + dataListenerSpy.lastCall.args[0][1].messageSize;
});

@@ -110,3 +136,5 @@ });

return consumer.offset('kafka-test-topic', 0).then(function (offset) {
return consumer.subscribe('kafka-test-topic', 0, { offset: offset - 2, maxBytes: 30 })
// ask for maxBytes that is only 1 byte less then required for both last messages
var maxBytes = 2 * (8 + 4) + maxBytesTestMessagesSize - 1;
return consumer.subscribe('kafka-test-topic', 0, { offset: offset - 2, maxBytes: maxBytes })
.delay(200)

@@ -143,13 +171,23 @@ .then(function () {

metadata: 'm2'
},
{
topic: 'kafka-test-topic',
partition: 2,
offset: 3,
metadata: 'm3'
}
]).then(function (result) {
result.should.be.an('array').that.has.length(2);
result.should.be.an('array').that.has.length(3);
result[0].should.be.an('object');
result[1].should.be.an('object');
result[2].should.be.an('object');
result[0].should.have.property('topic', 'kafka-test-topic');
result[1].should.have.property('topic', 'kafka-test-topic');
result[2].should.have.property('topic', 'kafka-test-topic');
result[0].should.have.property('partition').that.is.a('number');
result[1].should.have.property('partition').that.is.a('number');
result[2].should.have.property('partition').that.is.a('number');
result[0].should.have.property('error', null);
result[1].should.have.property('error', null);
result[2].should.have.property('error', null);
});

@@ -168,19 +206,29 @@ });

partition: 1
},
{
topic: 'kafka-test-topic',
partition: 2
}
]).then(function (result) {
result.should.be.an('array').that.has.length(2);
result.should.be.an('array').that.has.length(3);
result[0].should.be.an('object');
result[1].should.be.an('object');
result[2].should.be.an('object');
result[0].should.have.property('topic', 'kafka-test-topic');
result[1].should.have.property('topic', 'kafka-test-topic');
result[2].should.have.property('topic', 'kafka-test-topic');
result[0].should.have.property('partition').that.is.a('number');
result[1].should.have.property('partition').that.is.a('number');
result[2].should.have.property('partition').that.is.a('number');
result[0].should.have.property('offset').that.is.a('number');
result[1].should.have.property('offset').that.is.a('number');
result[2].should.have.property('offset').that.is.a('number');
result[0].should.have.property('error', null);
result[1].should.have.property('error', null);
result[2].should.have.property('error', null);
_.find(result, { topic: 'kafka-test-topic', partition: 0 }).offset.should.be.eql(1);
_.find(result, { topic: 'kafka-test-topic', partition: 1 }).offset.should.be.eql(2);
_.find(result, { topic: 'kafka-test-topic', partition: 2 }).offset.should.be.eql(3);
});
});
});

@@ -226,33 +226,2 @@ 'use strict';

it('should list groups', function () {
return consumers[0].listGroups().then(function (groups) {
groups.should.be.an('array').and.have.length(1);
groups[0].should.be.an('object');
groups[0].should.have.property('groupId', consumers[0].options.groupId);
groups[0].should.have.property('protocolType', 'consumer');
});
});
it('should describe group', function () {
return consumers[0].describeGroup().then(function (group) {
group.should.be.an('object');
group.should.have.property('error', null);
group.should.have.property('groupId', consumers[0].options.groupId);
group.should.have.property('state');
group.should.have.property('protocolType', 'consumer');
group.should.have.property('protocol', 'TestStrategy');
group.should.have.property('members').that.is.an('array');
group.members.should.have.length(3);
group.members[0].should.be.an('object');
group.members[0].should.have.property('memberId').that.is.a('string');
group.members[0].should.have.property('clientId').that.is.a('string');
group.members[0].should.have.property('clientHost').that.is.a('string');
group.members[0].should.have.property('version').that.is.a('number');
group.members[0].should.have.property('subscriptions').that.is.an('array');
group.members[0].should.have.property('metadata');
group.members[0].should.have.property('memberAssignment').that.is.a('object');
group.members[0].memberAssignment.should.have.property('partitionAssignment').that.is.an('array');
});
});
it('should not log errors on clean shutdown', function () {

@@ -259,0 +228,0 @@ var spy = sinon.spy(function () {});

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc