kafka-node
Advanced tools
Comparing version 2.5.0 to 2.6.0
# kafka-node CHANGELOG | ||
## 2018-04-24, Version 2.6.0 | ||
* Fix issue during the initial connection phase can end prematurely when metadata request failed [#920](https://github.com/SOHU-Co/kafka-node/pull/920) | ||
* Add `addTopics` method to the `ConsumerGroup` [#914](https://github.com/SOHU-Co/kafka-node/pull/914) | ||
* Fix issue where yielding a result in `onRebalance` in `ConsumerGroup` leads to an exception being thrown [#922](https://github.com/SOHU-Co/kafka-node/pull/922) | ||
* Add support for Kafka Fetch versions 1 and 2 this enables produced timestamps to be read from the Consumer (Kafka 0.10+). Fetches also now share socket to the broker with other kafka requests (previously fetches were on a dedicated socket) [#871](https://github.com/SOHU-Co/kafka-node/pull/871) | ||
* Add support to auto commit on first join `ConsumerGroup` configured using `commitOffsetsOnFirstJoin` [#897](https://github.com/SOHU-Co/kafka-node/pull/897) | ||
* Add support for Buffers as keys, which is useful for Avro encoded keys [#932](https://github.com/SOHU-Co/kafka-node/pull/932) | ||
## 2018-04-09, Version 2.5.0 | ||
@@ -4,0 +13,0 @@ * Explicitly cast key to string in hashCode function for `KeyedPartitioner` [#870](https://github.com/SOHU-Co/kafka-node/pull/870) |
@@ -179,2 +179,6 @@ 'use strict'; | ||
broker.socket.end(); | ||
setImmediate(function () { | ||
broker.socket.destroy(); | ||
broker.socket.unref(); | ||
}); | ||
}); | ||
@@ -190,6 +194,4 @@ }; | ||
Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) { | ||
var self = this; | ||
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes); | ||
var decoder = protocol.decodeFetchResponse(function (err, type, message) { | ||
Client.prototype._createMessageHandler = function (consumer) { | ||
return (err, type, message) => { | ||
if (err) { | ||
@@ -199,3 +201,3 @@ if (err.message === 'OffsetOutOfRange') { | ||
} else if (err.message === 'NotLeaderForPartition' || err.message === 'UnknownTopicOrPartition') { | ||
return self.emit('brokersChanged'); | ||
return this.emit('brokersChanged'); | ||
} | ||
@@ -217,4 +219,9 @@ | ||
} | ||
}, maxTickMessages); | ||
}; | ||
}; | ||
Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) { | ||
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes); | ||
var decoder = protocol.decodeFetchResponse(this._createMessageHandler(consumer), maxTickMessages); | ||
this.send(payloads, encoder, decoder, function (err) { | ||
@@ -221,0 +228,0 @@ if (err) { |
@@ -47,2 +47,3 @@ 'use strict'; | ||
retryMinTimeout: 1000, | ||
commitOffsetsOnFirstJoin: true, | ||
connectOnReady: true, | ||
@@ -257,2 +258,6 @@ migrateHLC: false, | ||
protocol = _.find(this.protocols, { name: protocol }); | ||
if (!protocol) { | ||
callback(new Error('Unknown group protocol: ' + protocol)); | ||
return; | ||
} | ||
@@ -285,2 +290,6 @@ var self = this; | ||
logger.debug('joinGroupResponse %j from %s', joinGroupResponse, this.client.clientId); | ||
if (!joinGroupResponse.memberId || !joinGroupResponse.generationId) { | ||
callback(new Error('Invalid joinGroupResponse: ' + JSON.stringify(joinGroupResponse))); | ||
return; | ||
} | ||
@@ -327,2 +336,3 @@ this.isLeader = joinGroupResponse.leaderId === joinGroupResponse.memberId; | ||
let noOffset; | ||
async.waterfall( | ||
@@ -336,3 +346,3 @@ [ | ||
var noOffset = topicPartitionList.some(function (tp) { | ||
noOffset = topicPartitionList.some(function (tp) { | ||
return offsets[tp.topic][tp.partition] === -1; | ||
@@ -399,3 +409,9 @@ }); | ||
}); | ||
callback(null, true); | ||
if (noOffset && self.options.commitOffsetsOnFirstJoin) { | ||
self.commit(true, (err) => { | ||
callback(err, !err ? true : null); | ||
}); | ||
} else { | ||
callback(null, true); | ||
} | ||
} | ||
@@ -451,3 +467,8 @@ ], | ||
if (typeof self.options.onRebalance === 'function') { | ||
self.options.onRebalance(self.generationId != null && self.memberId != null, callback); | ||
self.options.onRebalance(self.generationId != null && self.memberId != null, function (error) { | ||
if (error) { | ||
return callback(error); | ||
} | ||
callback(null); | ||
}); | ||
return; | ||
@@ -626,2 +647,25 @@ } | ||
ConsumerGroup.prototype.addTopics = function (topics, cb) { | ||
topics = Array.isArray(topics) ? topics : [topics]; | ||
if (!this.client.ready) { | ||
this.client.once('ready', () => this.addTopics(topics, cb)); | ||
return; | ||
} | ||
async.series([ | ||
callback => this.client.topicExists(topics, callback), | ||
callback => (this.options.autoCommit && this.generationId != null && this.memberId) | ||
? this.commit(true, callback) | ||
: callback(null), | ||
callback => this.leaveGroup(callback), | ||
callback => { | ||
this.topics = this.topics.concat(topics); | ||
this.setupProtocols(this.options.protocol); | ||
this.connect(); | ||
callback(null); | ||
} | ||
], error => error ? cb(error) : cb(null, `Add Topics ${topics.join(',')} Successfully`)); | ||
}; | ||
ConsumerGroup.prototype.close = function (force, cb) { | ||
@@ -628,0 +672,0 @@ var self = this; |
@@ -68,2 +68,3 @@ 'use strict'; | ||
this.consumerGroup.on('error', error => this.emit('error', error)); | ||
this.consumerGroup.on('connect', () => this.emit('connect')); | ||
this.consumerGroup.on('message', message => { | ||
@@ -70,0 +71,0 @@ this.messageBuffer.push(message); |
@@ -570,7 +570,13 @@ 'use strict'; | ||
HighLevelConsumer.prototype.fetch = function () { | ||
if (!this.ready || this.rebalancing || this.paused) { | ||
if (!this.ready || this.rebalancing || this.paused || this.closing) { | ||
return; | ||
} | ||
this.client.sendFetchRequest(this, this.topicPayloads, this.options.fetchMaxWaitMs, this.options.fetchMinBytes, this.options.maxTickMessages); | ||
this.client.sendFetchRequest( | ||
this, | ||
this.topicPayloads, | ||
this.options.fetchMaxWaitMs, | ||
this.options.fetchMinBytes, | ||
this.options.maxTickMessages | ||
); | ||
}; | ||
@@ -577,0 +583,0 @@ |
@@ -129,25 +129,37 @@ 'use strict'; | ||
logger.debug(`Connect attempt ${currentAttempt}`); | ||
this.connectToBrokers(this.initialHosts, error => { | ||
if (connect.retry(error)) { | ||
return; | ||
} | ||
this.connecting = false; | ||
async.series( | ||
[ | ||
callback => { | ||
this.connectToBrokers(this.initialHosts, callback); | ||
}, | ||
if (error) { | ||
logger.debug('exhausted retries. Main error', connect.mainError()); | ||
this.emit('error', connect.mainError()); | ||
return; | ||
} | ||
callback => { | ||
this.loadMetadataForTopics([], (error, result) => { | ||
if (error) { | ||
logger.debug('loadMetadataForTopics after connect failed', error); | ||
return callback(error); | ||
} | ||
this.updateMetadatas(result, true); | ||
callback(null); | ||
}); | ||
} | ||
], | ||
error => { | ||
if (connect.retry(error)) { | ||
return; | ||
} | ||
this.loadMetadataForTopics([], (error, result) => { | ||
this.connecting = false; | ||
if (error) { | ||
logger.debug('loadMetadataForTopics after connect failed', error); | ||
return this.emit('error', error); | ||
logger.debug('exhausted retries. Main error', connect.mainError()); | ||
this.emit('error', connect.mainError()); | ||
return; | ||
} | ||
this.updateMetadatas(result, true); | ||
this.ready = true; | ||
this.emit('ready'); | ||
}); | ||
}); | ||
} | ||
); | ||
}); | ||
@@ -293,3 +305,3 @@ }; | ||
KafkaClient.prototype.refreshBrokerMetadata = function (callback) { | ||
if (this.refreshingMetadata) { | ||
if (this.refreshingMetadata || this.closing) { | ||
return; | ||
@@ -554,2 +566,7 @@ } | ||
} | ||
if (_.isEmpty(versions)) { | ||
return callback(new Error(`getApiVersions response was empty for broker: ${broker}`)); | ||
} | ||
logger.debug('setting api support to %j', versions); | ||
@@ -768,6 +785,12 @@ broker.apiSupport = versions; | ||
if (!broker.isReady()) { | ||
callback(new Error('Broker is not ready (apiSuppport is not set)')); | ||
return; | ||
} | ||
const coder = getSupportedForRequestType(broker, request.type); | ||
const encoder = request.data.args != null ? coder.encoder.apply(null, request.data.args) : coder.encoder; | ||
const decoder = coder.decoder; | ||
const decoder = | ||
request.data.decoderArgs != null ? coder.decoder.apply(null, request.data.decoderArgs) : coder.decoder; | ||
@@ -787,3 +810,3 @@ const requestData = encoder(this.clientId, correlationId, payload); | ||
const broker = this.brokerForLeader(leader); | ||
if (broker.apiSupport == null) { | ||
if (!broker.isReady()) { | ||
logger.debug('missing apiSupport waiting until broker is ready...'); | ||
@@ -849,2 +872,36 @@ this.waitUntilReady(broker, callback); | ||
KafkaClient.prototype.sendFetchRequest = function ( | ||
consumer, | ||
payloads, | ||
fetchMaxWaitMs, | ||
fetchMinBytes, | ||
maxTickMessages, | ||
callback | ||
) { | ||
if (callback == null) { | ||
callback = _.noop; | ||
} | ||
async.series( | ||
[ | ||
callback => { | ||
this.verifyPayloadsHasLeaders(payloads, callback); | ||
}, | ||
callback => { | ||
const request = { | ||
type: 'fetch', | ||
data: { | ||
payloads: payloads, | ||
args: [fetchMaxWaitMs, fetchMinBytes], | ||
decoderArgs: [this._createMessageHandler(consumer), maxTickMessages] | ||
} | ||
}; | ||
this.sendRequest(request, callback); | ||
} | ||
], | ||
callback | ||
); | ||
}; | ||
KafkaClient.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeoutMs, callback) { | ||
@@ -851,0 +908,0 @@ async.series( |
@@ -48,6 +48,7 @@ 'use strict'; | ||
// Copyright The Obvious Corporation. | ||
KeyedPartitioner.prototype.hashCode = function (string) { | ||
KeyedPartitioner.prototype.hashCode = function (stringOrBuffer) { | ||
let hash = 0; | ||
if (string) { | ||
const length = string.toString().length; | ||
if (stringOrBuffer) { | ||
const string = stringOrBuffer.toString(); | ||
const length = string.length; | ||
@@ -54,0 +55,0 @@ for (let i = 0; i < length; i++) { |
@@ -47,2 +47,14 @@ 'use strict'; | ||
function encodeFetchRequestV1 (maxWaitMs, minBytes) { | ||
return function encodeFetchRequest (clientId, correlationId, payloads) { | ||
return _encodeFetchRequest(clientId, correlationId, payloads, maxWaitMs, minBytes, 1); | ||
}; | ||
} | ||
function encodeFetchRequestV2 (maxWaitMs, minBytes) { | ||
return function encodeFetchRequest (clientId, correlationId, payloads) { | ||
return _encodeFetchRequest(clientId, correlationId, payloads, maxWaitMs, minBytes, 2); | ||
}; | ||
} | ||
function decodeTopics (decodePartitions) { | ||
@@ -61,5 +73,5 @@ return function (end, vars) { | ||
function _encodeFetchRequest (clientId, correlationId, payloads, maxWaitMs, minBytes) { | ||
function _encodeFetchRequest (clientId, correlationId, payloads, maxWaitMs, minBytes, version) { | ||
payloads = groupByTopic(payloads); | ||
var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.fetch); | ||
var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.fetch, version); | ||
var topics = Object.keys(payloads); | ||
@@ -86,6 +98,12 @@ | ||
return function (resp) { | ||
return _decodeFetchResponse(resp, cb, maxTickMessages); | ||
return _decodeFetchResponse(resp, cb, maxTickMessages, 0); | ||
}; | ||
} | ||
function decodeFetchResponseV1 (cb, maxTickMessages) { | ||
return function (resp) { | ||
return _decodeFetchResponse(resp, cb, maxTickMessages, 1); | ||
}; | ||
} | ||
function createGroupError (errorCode) { | ||
@@ -106,3 +124,3 @@ if (errorCode == null || errorCode === 0) { | ||
function _decodeFetchResponse (resp, cb, maxTickMessages) { | ||
function _decodeFetchResponse (resp, cb, maxTickMessages, version) { | ||
var topics = {}; | ||
@@ -112,2 +130,7 @@ Binary.parse(resp) | ||
.word32bs('correlationId') | ||
.tap(function () { | ||
if (version >= 1) { | ||
this.word32bs('throttleTime'); | ||
} | ||
}) | ||
.word32bs('topicNum') | ||
@@ -167,2 +190,3 @@ .loop(decodeTopics(decodePartitions)); | ||
this.word64bs('timestamp'); | ||
cur += 8; | ||
} | ||
@@ -1097,3 +1121,5 @@ }) | ||
request.Int32BE(groups.length); | ||
groups.forEach(groupId => { request.Int16BE(groupId.length).string(groupId); }); | ||
groups.forEach(groupId => { | ||
request.Int16BE(groupId.length).string(groupId); | ||
}); | ||
@@ -1106,7 +1132,3 @@ return encodeRequestWithLength(request.make()); | ||
Binary.parse(resp) | ||
.word32bs('size') | ||
.word32bs('correlationId') | ||
.word32bs('describeNum') | ||
.loop(decodeDescriptions); | ||
Binary.parse(resp).word32bs('size').word32bs('correlationId').word32bs('describeNum').loop(decodeDescriptions); | ||
@@ -1267,2 +1289,5 @@ function decodeDescriptions (end, vars) { | ||
exports.decodeFetchResponse = decodeFetchResponse; | ||
exports.encodeFetchRequestV1 = encodeFetchRequestV1; | ||
exports.decodeFetchResponseV1 = decodeFetchResponseV1; | ||
exports.encodeFetchRequestV2 = encodeFetchRequestV2; | ||
@@ -1269,0 +1294,0 @@ exports.encodeOffsetCommitRequest = encodeOffsetCommitRequest; |
@@ -12,3 +12,7 @@ 'use strict'; | ||
], | ||
fetch: [[p.encodeFetchRequest, p.decodeFetchResponse]], | ||
fetch: [ | ||
[p.encodeFetchRequest, p.decodeFetchResponse], | ||
[p.encodeFetchRequestV1, p.decodeFetchResponseV1], | ||
[p.encodeFetchRequestV2, p.decodeFetchResponseV1] | ||
], | ||
offset: [[p.encodeOffsetRequest, p.decodeOffsetResponse]], | ||
@@ -45,2 +49,7 @@ metadata: [[p.encodeMetadataRequest, p.decodeMetadataResponse]], | ||
const API_SUPPORTED_IN_KAFKA_0_9 = { | ||
fetch: { | ||
min: 0, | ||
max: 1, | ||
usable: 1 | ||
}, | ||
produce: { | ||
@@ -47,0 +56,0 @@ min: 0, |
@@ -43,2 +43,6 @@ 'use strict'; | ||
BrokerWrapper.prototype.isReady = function () { | ||
return this.apiSupport != null; | ||
}; | ||
BrokerWrapper.prototype.isIdle = function () { | ||
@@ -57,2 +61,8 @@ return Date.now() - this._lastWrite >= this.idleConnectionMs; | ||
BrokerWrapper.prototype.toString = function () { | ||
return `[${this.constructor.name} ${ | ||
this.socket.addr | ||
} (connected: ${this.isConnected()}) (ready: ${this.isReady()}) (idle: ${this.isIdle()})]`; | ||
}; | ||
module.exports = BrokerWrapper; |
@@ -17,3 +17,3 @@ { | ||
"bugs": "https://github.com/SOHU-co/kafka-node/issues", | ||
"version": "2.5.0", | ||
"version": "2.6.0", | ||
"main": "kafka.js", | ||
@@ -20,0 +20,0 @@ "license": "MIT", |
@@ -58,2 +58,5 @@ Kafka-node | ||
* Connect directly to brokers (Kafka 0.9+) | ||
* Administrative APIs | ||
* List Groups | ||
* Describe Groups | ||
@@ -90,2 +93,3 @@ # Install Kafka | ||
* `maxAsyncRequests` : maximum async operations at a time toward the kafka cluster. default: 10 | ||
* `sslOptions`: **Object**, options to be passed to the tls broker sockets, ex. { rejectUnauthorized: false } (Kafka +0.9) | ||
@@ -146,3 +150,3 @@ ### Example | ||
messages: ['message body'], // multi messages should be a array, single message can be just a string or a KeyedMessage instance | ||
key: 'theKey', // only needed when using keyed partitioner | ||
key: 'theKey', // string or buffer, only needed when using keyed partitioner | ||
partition: 0, // default 0 | ||
@@ -244,3 +248,3 @@ attributes: 2, // default: 0 | ||
messages: ['message body'], // multi messages should be a array, single message can be just a string, | ||
key: 'theKey', // only needed when using keyed partitioner | ||
key: 'theKey', // string or buffer, only needed when using keyed partitioner | ||
attributes: 1, | ||
@@ -720,3 +724,3 @@ timestamp: Date.now() // <-- defaults to Date.now() (only available with kafka v0.10 and KafkaClient only) | ||
fromOffset: 'latest', // default | ||
commitOffsetsOnFirstJoin: true, // on the very first time this consumer group subscribes to a topic, record the offset returned in fromOffset (latest/earliest) | ||
// how to recover from OutOfRangeOffset error (where save offset is past server retention) accepts same value as fromOffset | ||
@@ -723,0 +727,0 @@ outOfRangeOffset: 'earliest', // default |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
296394
6743
1300