Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

kafka-node

Package Overview
Dependencies
Maintainers
3
Versions
113
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-node - npm Package Compare versions

Comparing version 2.4.1 to 2.5.0

8

CHANGELOG.md
# kafka-node CHANGELOG
## 2018-04-09, Version 2.5.0
* Explicitly cast key to string in hashCode function for `KeyedPartitioner` [#870](https://github.com/SOHU-Co/kafka-node/pull/870)
* For consumer fetch loop we now clear `socket.waiting` before invoking callbacks [#819](https://github.com/SOHU-Co/kafka-node/pull/819)
* Add Support for IPv6 [#818](https://github.com/SOHU-Co/kafka-node/pull/818)
* Clear internal topicPayload array if no topic partitions are assigned to the `ConsumerGroup` [#888](https://github.com/SOHU-Co/kafka-node/pull/888)
* Fix Stale Commit Queue for `ConsumerGroupStream` [#891](https://github.com/SOHU-Co/kafka-node/pull/891)
* For rebalance case `ConsumerGroup` will try to commit before joining a group. Added `onRebalance` callback for manual commit users [#889](https://github.com/SOHU-Co/kafka-node/pull/889)
## 2018-02-06, Version 2.4.1

@@ -4,0 +12,0 @@

9

lib/admin.js

@@ -5,5 +5,6 @@ 'use strict';

var util = require('util');
var events = require('events');
var EventEmitter = require('events');
var Admin = function (kafkaClient) {
function Admin (kafkaClient) {
EventEmitter.call(this);
if (!(kafkaClient instanceof KafkaClient)) {

@@ -26,4 +27,4 @@ throw new Error("'Admin' only accepts 'KafkaClient' for its kafka client.");

});
};
util.inherits(Admin, events.EventEmitter);
}
util.inherits(Admin, EventEmitter);

@@ -30,0 +31,0 @@ Admin.prototype.listGroups = function (cb) {

@@ -5,3 +5,3 @@ 'use strict';

var util = require('util');
var events = require('events');
var EventEmitter = require('events');
var _ = require('lodash');

@@ -57,2 +57,3 @@ var protocol = require('./protocol');

function BaseProducer (client, options, defaultPartitionerType, customPartitioner) {
EventEmitter.call(this);
options = options || {};

@@ -80,3 +81,3 @@

util.inherits(BaseProducer, events.EventEmitter);
util.inherits(BaseProducer, EventEmitter);

@@ -83,0 +84,0 @@ BaseProducer.prototype.connect = function () {

@@ -10,3 +10,3 @@ 'use strict';

var retry = require('retry');
var events = require('events');
var EventEmitter = require('events');
var errors = require('./errors');

@@ -51,6 +51,8 @@ var getCodec = require('./codec');

*/
var Client = function (connectionString, clientId, zkOptions, noAckBatchOptions, sslOptions) {
function Client (connectionString, clientId, zkOptions, noAckBatchOptions, sslOptions) {
if (this instanceof Client === false) {
return new Client(connectionString, clientId, zkOptions, noAckBatchOptions, sslOptions);
}
EventEmitter.call(this);
this.setMaxListeners(20); // waitUntilReady can trigger lots of listeners under high activity, so increase threshold

@@ -78,5 +80,5 @@ this.sslOptions = sslOptions;

this.connect();
};
}
util.inherits(Client, events.EventEmitter);
util.inherits(Client, EventEmitter);

@@ -765,2 +767,6 @@ Client.prototype.connect = function () {

if (buffer.length >= size) {
if (socket.longpolling) {
socket.waiting = false;
}
var resp = buffer.shallowSlice(0, size);

@@ -779,5 +785,2 @@ var correlationId = resp.readUInt32BE(4);

buffer.consume(size);
if (socket.longpolling) {
socket.waiting = false;
}
} else {

@@ -784,0 +787,0 @@ return;

@@ -5,3 +5,3 @@ 'use strict';

var _ = require('lodash');
var events = require('events');
var EventEmitter = require('events');
var logger = require('./logging')('kafka-node:Consumer');

@@ -30,3 +30,4 @@ var utils = require('./utils');

var Consumer = function (client, topics, options) {
function Consumer (client, topics, options) {
EventEmitter.call(this);
if (!topics) {

@@ -51,4 +52,4 @@ throw new Error('Must have payloads');

}
};
util.inherits(Consumer, events.EventEmitter);
}
util.inherits(Consumer, EventEmitter);

@@ -55,0 +56,0 @@ Consumer.prototype.buildPayloads = function (payloads) {

@@ -6,3 +6,3 @@ 'use strict';

const EventEmitter = require('events');
const highLevelConsumer = require('./highLevelConsumer');
const HighLevelConsumer = require('./highLevelConsumer');
const Client = require('./client');

@@ -51,2 +51,3 @@ const KafkaClient = require('./kafkaClient');

migrateRolling: true,
onRebalance: null,
protocol: ['roundrobin']

@@ -56,3 +57,3 @@ };

function ConsumerGroup (memberOptions, topics) {
EventEmitter.call(this);
EventEmitter.call(this); // Intentionally not calling HighLevelConsumer to avoid constructor logic
const self = this;

@@ -205,3 +206,3 @@ this.options = _.defaults(memberOptions || {}, DEFAULTS);

util.inherits(ConsumerGroup, highLevelConsumer);
util.inherits(ConsumerGroup, HighLevelConsumer);

@@ -403,2 +404,3 @@ ConsumerGroup.prototype.reconnectIfNeeded = function () {

} else {
self.topicPayloads = [];
// no partitions assigned

@@ -447,2 +449,21 @@ callback(null, false);

function (callback) {
if (typeof self.options.onRebalance === 'function') {
self.options.onRebalance(self.generationId != null && self.memberId != null, callback);
return;
}
callback(null);
},
function (callback) {
if (self.options.autoCommit && self.generationId != null && self.memberId) {
self.commit(true, function (error) {
if (error) {
return callback(error);
}
callback(null);
});
return;
}
callback(null);
},
function (callback) {
if (self.client.coordinatorId) {

@@ -449,0 +470,0 @@ return callback(null, null);

@@ -9,3 +9,3 @@ 'use strict';

const _ = require('lodash');
const EventEmitter = require('events').EventEmitter;
const EventEmitter = require('events');
const NUMER_OF_TIMES_TO_VERIFY = 4;

@@ -12,0 +12,0 @@ const VERIFY_WAIT_TIME_MS = 1500;

@@ -22,3 +22,3 @@ 'use strict';

partition: partition,
offset: offset + 1,
offset: offset,
metadata: 'm'

@@ -37,2 +37,3 @@ });

_.defaultsDeep(options || {}, DEFAULTS);
const self = this;

@@ -43,2 +44,21 @@ this.autoCommit = options.autoCommit;

options.autoCommit = false;
const originalOnRebalance = options.onRebalance;
options.onRebalance = function (isAlreadyMember, callback) {
const autoCommit = _.once(function (err) {
if (err) {
callback(err);
} else {
self.commit(null, true, callback);
}
});
if (typeof originalOnRebalance === 'function') {
try {
originalOnRebalance(isAlreadyMember, autoCommit);
} catch (e) {
autoCommit(e);
}
} else {
autoCommit();
}
};

@@ -79,3 +99,3 @@ this.consumerGroup = new ConsumerGroup(options, topics);

if (message != null && message.offset !== -1) {
_.set(this.commitQueue, [message.topic, message.partition], message.offset);
_.set(this.commitQueue, [message.topic, message.partition], message.offset + 1);
}

@@ -82,0 +102,0 @@

@@ -5,3 +5,3 @@ 'use strict';

var _ = require('lodash');
var events = require('events');
var EventEmitter = require('events');
var uuid = require('uuid');

@@ -38,2 +38,3 @@ var async = require('async');

var HighLevelConsumer = function (client, topics, options) {
EventEmitter.call(this);
if (!topics) {

@@ -67,3 +68,3 @@ throw new Error('Must have payloads');

};
util.inherits(HighLevelConsumer, events.EventEmitter);
util.inherits(HighLevelConsumer, EventEmitter);

@@ -70,0 +71,0 @@ HighLevelConsumer.prototype.buildPayloads = function (payloads) {

@@ -5,2 +5,3 @@ 'use strict';

const logger = require('./logging')('kafka-node:KafkaClient');
const EventEmitter = require('events');
const async = require('async');

@@ -42,6 +43,8 @@ const retry = require('retry');

},
maxAsyncRequests: 10
maxAsyncRequests: 10,
noAckBatchOptions: null
};
const KafkaClient = function (options) {
EventEmitter.call(this); // Intentionally not calling Client to avoid constructor logic
this.options = _.defaultsDeep(options || {}, DEFAULTS);

@@ -61,3 +64,3 @@

this.clientId = this.options.clientId || 'kafka-node-client';
this.noAckBatchOptions = this.noAckBatchOptions;
this.noAckBatchOptions = this.options.noAckBatchOptions;
this.brokers = {};

@@ -98,6 +101,9 @@ this.longpollingBrokers = {};

function parseHost (hostString) {
const piece = hostString.split(':');
const ip = hostString.substring(0, hostString.lastIndexOf(':'));
const port = hostString.substring(hostString.lastIndexOf(':') + 1);
const isIpv6 = ip.match(/\[(.*)\]/);
const host = isIpv6 ? isIpv6[1] : ip;
return {
host: piece[0],
port: piece[1]
host,
port
};

@@ -104,0 +110,0 @@ }

@@ -5,5 +5,6 @@ 'use strict';

var async = require('async');
var events = require('events');
var EventEmitter = require('events');
var Offset = function (client) {
function Offset (client) {
EventEmitter.call(this);
var self = this;

@@ -22,4 +23,4 @@ this.client = client;

});
};
util.inherits(Offset, events.EventEmitter);
}
util.inherits(Offset, EventEmitter);

@@ -26,0 +27,0 @@ Offset.prototype.fetch = function (payloads, cb) {

'use strict';
var util = require('util');
var _ = require('lodash');
const util = require('util');
const _ = require('lodash');
var Partitioner = function () {};
function Partitioner () { }
var DefaultPartitioner = function () {};
function DefaultPartitioner () {
Partitioner.call(this);
}
util.inherits(DefaultPartitioner, Partitioner);

@@ -19,5 +21,6 @@

var CyclicPartitioner = function () {
function CyclicPartitioner () {
Partitioner.call(this);
this.c = 0;
};
}
util.inherits(CyclicPartitioner, Partitioner);

@@ -27,6 +30,8 @@

if (_.isEmpty(partitions)) return 0;
return partitions[ this.c++ % partitions.length ];
return partitions[this.c++ % partitions.length];
};
var RandomPartitioner = function () {};
function RandomPartitioner () {
Partitioner.call(this);
}
util.inherits(RandomPartitioner, Partitioner);

@@ -38,3 +43,5 @@

var KeyedPartitioner = function () {};
function KeyedPartitioner () {
Partitioner.call(this);
}
util.inherits(KeyedPartitioner, Partitioner);

@@ -45,7 +52,9 @@

KeyedPartitioner.prototype.hashCode = function (string) {
var hash = 0;
var length = string.length;
let hash = 0;
if (string) {
const length = string.toString().length;
for (var i = 0; i < length; i++) {
hash = ((hash * 31) + string.charCodeAt(i)) & 0x7fffffff;
for (let i = 0; i < length; i++) {
hash = ((hash * 31) + string.charCodeAt(i)) & 0x7fffffff;
}
}

@@ -59,9 +68,10 @@

var index = this.hashCode(key) % partitions.length;
const index = this.hashCode(key) % partitions.length;
return partitions[index];
};
var CustomPartitioner = function (partitioner) {
function CustomPartitioner (partitioner) {
Partitioner.call(this);
this.getPartition = partitioner;
};
}
util.inherits(CustomPartitioner, Partitioner);

@@ -68,0 +78,0 @@

@@ -6,3 +6,3 @@ 'use strict';

var async = require('async');
var EventEmiter = require('events').EventEmitter;
var EventEmitter = require('events');
var logger = require('./logging')('kafka-node:zookeeper');

@@ -19,2 +19,3 @@

var Zookeeper = function (connectionString, options) {
EventEmitter.call(this);
this.client = zookeeper.createClient(connectionString, options);

@@ -32,3 +33,3 @@

util.inherits(Zookeeper, EventEmiter);
util.inherits(Zookeeper, EventEmitter);

@@ -35,0 +36,0 @@ Zookeeper.prototype.unregisterConsumer = function (groupId, consumerId, cb) {

@@ -17,3 +17,3 @@ {

"bugs": "https://github.com/SOHU-co/kafka-node/issues",
"version": "2.4.1",
"version": "2.5.0",
"main": "kafka.js",

@@ -20,0 +20,0 @@ "license": "MIT",

@@ -544,3 +544,3 @@ Kafka-node

## HighLevelConsumer
⚠️ ***This consumer has been deprecated in the latest version of Kafka (0.10.1) and is likely to be removed in the future. Please use the ConsumerGroup instead.***
⚠️ ***This consumer has been deprecated and is likely to be removed in the future. Please use the ConsumerGroup instead.***

@@ -721,3 +721,6 @@ ### HighLevelConsumer(client, payloads, options)

migrateHLC: false, // for details please see Migration section below
migrateRolling: true
migrateRolling: true,
// Callback to allow consumers with autoCommit false a chance to commit before a rebalance finishes
// isAlreadyMember will be false on the first connection, and true on rebalances triggered after that
onRebalance: (isAlreadyMember, callback) => { callback(); } // or null
};

@@ -873,2 +876,9 @@

### commit(message, force, callback)
This method can be used to commit manually when `autoCommit` is set to `false`.
* `message` the original message or an object with `{topic, partition, offset}`
* `force` a commit even if there's a pending commit
* `callback` (*optional*)
### close(callback)

@@ -919,2 +929,5 @@ Closes the `ConsumerGroup`. Calls `callback` when complete. If `autoCommit` is enabled calling close will also commit offsets consumed from the buffer.

### commit(groupId, payloads, cb)
> ⚠️**WARNING**: commits are made to zookeeper and is only compatible with `HighLevelConsumer` and will **NOT** with the new `ConsumerGroup`
* `groupId`: consumer group

@@ -945,2 +958,5 @@ * `payloads`: **Array**,array of `OffsetCommitRequest`, `OffsetCommitRequest` is a JSON object like:

### fetchCommits(groupid, payloads, cb)
> ⚠️**WARNING**: commits are from zookeeper and is only compatible with `HighLevelConsumer` and will **NOT** with the new `ConsumerGroup`
Fetch the last committed offset in a topic of a specific consumer group

@@ -947,0 +963,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc