Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

kafka-node

Package Overview
Dependencies
Maintainers
3
Versions
113
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-node - npm Package Compare versions

Comparing version 2.5.0 to 2.6.0

9

CHANGELOG.md
# kafka-node CHANGELOG
## 2018-04-24, Version 2.6.0
* Fix issue during the initial connection phase can end prematurely when metadata request failed [#920](https://github.com/SOHU-Co/kafka-node/pull/920)
* Add `addTopics` method to the `ConsumerGroup` [#914](https://github.com/SOHU-Co/kafka-node/pull/914)
* Fix issue where yielding a result in `onRebalance` in `ConsumerGroup` leads to an exception being thrown [#922](https://github.com/SOHU-Co/kafka-node/pull/922)
* Add support for Kafka Fetch versions 1 and 2 this enables produced timestamps to be read from the Consumer (Kafka 0.10+). Fetches also now share socket to the broker with other kafka requests (previously fetches were on a dedicated socket) [#871](https://github.com/SOHU-Co/kafka-node/pull/871)
* Add support to auto commit on first join `ConsumerGroup` configured using `commitOffsetsOnFirstJoin` [#897](https://github.com/SOHU-Co/kafka-node/pull/897)
* Add support for Buffers as keys, which is useful for Avro encoded keys [#932](https://github.com/SOHU-Co/kafka-node/pull/932)
## 2018-04-09, Version 2.5.0

@@ -4,0 +13,0 @@ * Explicitly cast key to string in hashCode function for `KeyedPartitioner` [#870](https://github.com/SOHU-Co/kafka-node/pull/870)

19

lib/client.js

@@ -179,2 +179,6 @@ 'use strict';

broker.socket.end();
setImmediate(function () {
broker.socket.destroy();
broker.socket.unref();
});
});

@@ -190,6 +194,4 @@ };

Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
var self = this;
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes);
var decoder = protocol.decodeFetchResponse(function (err, type, message) {
Client.prototype._createMessageHandler = function (consumer) {
return (err, type, message) => {
if (err) {

@@ -199,3 +201,3 @@ if (err.message === 'OffsetOutOfRange') {

} else if (err.message === 'NotLeaderForPartition' || err.message === 'UnknownTopicOrPartition') {
return self.emit('brokersChanged');
return this.emit('brokersChanged');
}

@@ -217,4 +219,9 @@

}
}, maxTickMessages);
};
};
Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes);
var decoder = protocol.decodeFetchResponse(this._createMessageHandler(consumer), maxTickMessages);
this.send(payloads, encoder, decoder, function (err) {

@@ -221,0 +228,0 @@ if (err) {

@@ -47,2 +47,3 @@ 'use strict';

retryMinTimeout: 1000,
commitOffsetsOnFirstJoin: true,
connectOnReady: true,

@@ -257,2 +258,6 @@ migrateHLC: false,

protocol = _.find(this.protocols, { name: protocol });
if (!protocol) {
callback(new Error('Unknown group protocol: ' + protocol));
return;
}

@@ -285,2 +290,6 @@ var self = this;

logger.debug('joinGroupResponse %j from %s', joinGroupResponse, this.client.clientId);
if (!joinGroupResponse.memberId || !joinGroupResponse.generationId) {
callback(new Error('Invalid joinGroupResponse: ' + JSON.stringify(joinGroupResponse)));
return;
}

@@ -327,2 +336,3 @@ this.isLeader = joinGroupResponse.leaderId === joinGroupResponse.memberId;

let noOffset;
async.waterfall(

@@ -336,3 +346,3 @@ [

var noOffset = topicPartitionList.some(function (tp) {
noOffset = topicPartitionList.some(function (tp) {
return offsets[tp.topic][tp.partition] === -1;

@@ -399,3 +409,9 @@ });

});
callback(null, true);
if (noOffset && self.options.commitOffsetsOnFirstJoin) {
self.commit(true, (err) => {
callback(err, !err ? true : null);
});
} else {
callback(null, true);
}
}

@@ -451,3 +467,8 @@ ],

if (typeof self.options.onRebalance === 'function') {
self.options.onRebalance(self.generationId != null && self.memberId != null, callback);
self.options.onRebalance(self.generationId != null && self.memberId != null, function (error) {
if (error) {
return callback(error);
}
callback(null);
});
return;

@@ -626,2 +647,25 @@ }

ConsumerGroup.prototype.addTopics = function (topics, cb) {
topics = Array.isArray(topics) ? topics : [topics];
if (!this.client.ready) {
this.client.once('ready', () => this.addTopics(topics, cb));
return;
}
async.series([
callback => this.client.topicExists(topics, callback),
callback => (this.options.autoCommit && this.generationId != null && this.memberId)
? this.commit(true, callback)
: callback(null),
callback => this.leaveGroup(callback),
callback => {
this.topics = this.topics.concat(topics);
this.setupProtocols(this.options.protocol);
this.connect();
callback(null);
}
], error => error ? cb(error) : cb(null, `Add Topics ${topics.join(',')} Successfully`));
};
ConsumerGroup.prototype.close = function (force, cb) {

@@ -628,0 +672,0 @@ var self = this;

@@ -68,2 +68,3 @@ 'use strict';

this.consumerGroup.on('error', error => this.emit('error', error));
this.consumerGroup.on('connect', () => this.emit('connect'));
this.consumerGroup.on('message', message => {

@@ -70,0 +71,0 @@ this.messageBuffer.push(message);

@@ -570,7 +570,13 @@ 'use strict';

HighLevelConsumer.prototype.fetch = function () {
if (!this.ready || this.rebalancing || this.paused) {
if (!this.ready || this.rebalancing || this.paused || this.closing) {
return;
}
this.client.sendFetchRequest(this, this.topicPayloads, this.options.fetchMaxWaitMs, this.options.fetchMinBytes, this.options.maxTickMessages);
this.client.sendFetchRequest(
this,
this.topicPayloads,
this.options.fetchMaxWaitMs,
this.options.fetchMinBytes,
this.options.maxTickMessages
);
};

@@ -577,0 +583,0 @@

@@ -129,25 +129,37 @@ 'use strict';

logger.debug(`Connect attempt ${currentAttempt}`);
this.connectToBrokers(this.initialHosts, error => {
if (connect.retry(error)) {
return;
}
this.connecting = false;
async.series(
[
callback => {
this.connectToBrokers(this.initialHosts, callback);
},
if (error) {
logger.debug('exhausted retries. Main error', connect.mainError());
this.emit('error', connect.mainError());
return;
}
callback => {
this.loadMetadataForTopics([], (error, result) => {
if (error) {
logger.debug('loadMetadataForTopics after connect failed', error);
return callback(error);
}
this.updateMetadatas(result, true);
callback(null);
});
}
],
error => {
if (connect.retry(error)) {
return;
}
this.loadMetadataForTopics([], (error, result) => {
this.connecting = false;
if (error) {
logger.debug('loadMetadataForTopics after connect failed', error);
return this.emit('error', error);
logger.debug('exhausted retries. Main error', connect.mainError());
this.emit('error', connect.mainError());
return;
}
this.updateMetadatas(result, true);
this.ready = true;
this.emit('ready');
});
});
}
);
});

@@ -293,3 +305,3 @@ };

KafkaClient.prototype.refreshBrokerMetadata = function (callback) {
if (this.refreshingMetadata) {
if (this.refreshingMetadata || this.closing) {
return;

@@ -554,2 +566,7 @@ }

}
if (_.isEmpty(versions)) {
return callback(new Error(`getApiVersions response was empty for broker: ${broker}`));
}
logger.debug('setting api support to %j', versions);

@@ -768,6 +785,12 @@ broker.apiSupport = versions;

if (!broker.isReady()) {
callback(new Error('Broker is not ready (apiSuppport is not set)'));
return;
}
const coder = getSupportedForRequestType(broker, request.type);
const encoder = request.data.args != null ? coder.encoder.apply(null, request.data.args) : coder.encoder;
const decoder = coder.decoder;
const decoder =
request.data.decoderArgs != null ? coder.decoder.apply(null, request.data.decoderArgs) : coder.decoder;

@@ -787,3 +810,3 @@ const requestData = encoder(this.clientId, correlationId, payload);

const broker = this.brokerForLeader(leader);
if (broker.apiSupport == null) {
if (!broker.isReady()) {
logger.debug('missing apiSupport waiting until broker is ready...');

@@ -849,2 +872,36 @@ this.waitUntilReady(broker, callback);

KafkaClient.prototype.sendFetchRequest = function (
consumer,
payloads,
fetchMaxWaitMs,
fetchMinBytes,
maxTickMessages,
callback
) {
if (callback == null) {
callback = _.noop;
}
async.series(
[
callback => {
this.verifyPayloadsHasLeaders(payloads, callback);
},
callback => {
const request = {
type: 'fetch',
data: {
payloads: payloads,
args: [fetchMaxWaitMs, fetchMinBytes],
decoderArgs: [this._createMessageHandler(consumer), maxTickMessages]
}
};
this.sendRequest(request, callback);
}
],
callback
);
};
KafkaClient.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeoutMs, callback) {

@@ -851,0 +908,0 @@ async.series(

@@ -48,6 +48,7 @@ 'use strict';

// Copyright The Obvious Corporation.
KeyedPartitioner.prototype.hashCode = function (string) {
KeyedPartitioner.prototype.hashCode = function (stringOrBuffer) {
let hash = 0;
if (string) {
const length = string.toString().length;
if (stringOrBuffer) {
const string = stringOrBuffer.toString();
const length = string.length;

@@ -54,0 +55,0 @@ for (let i = 0; i < length; i++) {

@@ -47,2 +47,14 @@ 'use strict';

function encodeFetchRequestV1 (maxWaitMs, minBytes) {
return function encodeFetchRequest (clientId, correlationId, payloads) {
return _encodeFetchRequest(clientId, correlationId, payloads, maxWaitMs, minBytes, 1);
};
}
function encodeFetchRequestV2 (maxWaitMs, minBytes) {
return function encodeFetchRequest (clientId, correlationId, payloads) {
return _encodeFetchRequest(clientId, correlationId, payloads, maxWaitMs, minBytes, 2);
};
}
function decodeTopics (decodePartitions) {

@@ -61,5 +73,5 @@ return function (end, vars) {

function _encodeFetchRequest (clientId, correlationId, payloads, maxWaitMs, minBytes) {
function _encodeFetchRequest (clientId, correlationId, payloads, maxWaitMs, minBytes, version) {
payloads = groupByTopic(payloads);
var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.fetch);
var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.fetch, version);
var topics = Object.keys(payloads);

@@ -86,6 +98,12 @@

return function (resp) {
return _decodeFetchResponse(resp, cb, maxTickMessages);
return _decodeFetchResponse(resp, cb, maxTickMessages, 0);
};
}
function decodeFetchResponseV1 (cb, maxTickMessages) {
return function (resp) {
return _decodeFetchResponse(resp, cb, maxTickMessages, 1);
};
}
function createGroupError (errorCode) {

@@ -106,3 +124,3 @@ if (errorCode == null || errorCode === 0) {

function _decodeFetchResponse (resp, cb, maxTickMessages) {
function _decodeFetchResponse (resp, cb, maxTickMessages, version) {
var topics = {};

@@ -112,2 +130,7 @@ Binary.parse(resp)

.word32bs('correlationId')
.tap(function () {
if (version >= 1) {
this.word32bs('throttleTime');
}
})
.word32bs('topicNum')

@@ -167,2 +190,3 @@ .loop(decodeTopics(decodePartitions));

this.word64bs('timestamp');
cur += 8;
}

@@ -1097,3 +1121,5 @@ })

request.Int32BE(groups.length);
groups.forEach(groupId => { request.Int16BE(groupId.length).string(groupId); });
groups.forEach(groupId => {
request.Int16BE(groupId.length).string(groupId);
});

@@ -1106,7 +1132,3 @@ return encodeRequestWithLength(request.make());

Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
.word32bs('describeNum')
.loop(decodeDescriptions);
Binary.parse(resp).word32bs('size').word32bs('correlationId').word32bs('describeNum').loop(decodeDescriptions);

@@ -1267,2 +1289,5 @@ function decodeDescriptions (end, vars) {

exports.decodeFetchResponse = decodeFetchResponse;
exports.encodeFetchRequestV1 = encodeFetchRequestV1;
exports.decodeFetchResponseV1 = decodeFetchResponseV1;
exports.encodeFetchRequestV2 = encodeFetchRequestV2;

@@ -1269,0 +1294,0 @@ exports.encodeOffsetCommitRequest = encodeOffsetCommitRequest;

@@ -12,3 +12,7 @@ 'use strict';

],
fetch: [[p.encodeFetchRequest, p.decodeFetchResponse]],
fetch: [
[p.encodeFetchRequest, p.decodeFetchResponse],
[p.encodeFetchRequestV1, p.decodeFetchResponseV1],
[p.encodeFetchRequestV2, p.decodeFetchResponseV1]
],
offset: [[p.encodeOffsetRequest, p.decodeOffsetResponse]],

@@ -45,2 +49,7 @@ metadata: [[p.encodeMetadataRequest, p.decodeMetadataResponse]],

const API_SUPPORTED_IN_KAFKA_0_9 = {
fetch: {
min: 0,
max: 1,
usable: 1
},
produce: {

@@ -47,0 +56,0 @@ min: 0,

@@ -43,2 +43,6 @@ 'use strict';

BrokerWrapper.prototype.isReady = function () {
return this.apiSupport != null;
};
BrokerWrapper.prototype.isIdle = function () {

@@ -57,2 +61,8 @@ return Date.now() - this._lastWrite >= this.idleConnectionMs;

BrokerWrapper.prototype.toString = function () {
return `[${this.constructor.name} ${
this.socket.addr
} (connected: ${this.isConnected()}) (ready: ${this.isReady()}) (idle: ${this.isIdle()})]`;
};
module.exports = BrokerWrapper;

@@ -17,3 +17,3 @@ {

"bugs": "https://github.com/SOHU-co/kafka-node/issues",
"version": "2.5.0",
"version": "2.6.0",
"main": "kafka.js",

@@ -20,0 +20,0 @@ "license": "MIT",

@@ -58,2 +58,5 @@ Kafka-node

* Connect directly to brokers (Kafka 0.9+)
* Administrative APIs
* List Groups
* Describe Groups

@@ -90,2 +93,3 @@ # Install Kafka

* `maxAsyncRequests` : maximum async operations at a time toward the kafka cluster. default: 10
* `sslOptions`: **Object**, options to be passed to the tls broker sockets, ex. { rejectUnauthorized: false } (Kafka +0.9)

@@ -146,3 +150,3 @@ ### Example

messages: ['message body'], // multi messages should be a array, single message can be just a string or a KeyedMessage instance
key: 'theKey', // only needed when using keyed partitioner
key: 'theKey', // string or buffer, only needed when using keyed partitioner
partition: 0, // default 0

@@ -244,3 +248,3 @@ attributes: 2, // default: 0

messages: ['message body'], // multi messages should be a array, single message can be just a string,
key: 'theKey', // only needed when using keyed partitioner
key: 'theKey', // string or buffer, only needed when using keyed partitioner
attributes: 1,

@@ -720,3 +724,3 @@ timestamp: Date.now() // <-- defaults to Date.now() (only available with kafka v0.10 and KafkaClient only)

fromOffset: 'latest', // default
commitOffsetsOnFirstJoin: true, // on the very first time this consumer group subscribes to a topic, record the offset returned in fromOffset (latest/earliest)
// how to recover from OutOfRangeOffset error (where save offset is past server retention) accepts same value as fromOffset

@@ -723,0 +727,0 @@ outOfRangeOffset: 'earliest', // default

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc