kafka-node
Advanced tools
Comparing version 4.1.3 to 5.0.0
# kafka-node CHANGELOG | ||
## 2019-11-04, Version 5.0.0 | ||
* Fix describe configs for multiple brokers [#1280](https://github.com/SOHU-Co/kafka-node/pull/1280) | ||
* Fix requestTimeout bug [#1255](https://github.com/SOHU-Co/kafka-node/pull/1255) | ||
* Improve consumer recovering from stalling when cluster redeploys [#1345](https://github.com/SOHU-Co/kafka-node/pull/1345) | ||
### BREAKING CHANGE | ||
* Dropped support for node 6 | ||
## 2019-04-30, Version 4.1.3 | ||
@@ -4,0 +12,0 @@ * Fix parseHost returning a string port instead of a number [#1257](https://github.com/SOHU-Co/kafka-node/pull/1257) |
@@ -115,3 +115,3 @@ 'use strict'; | ||
logger.debug('brokersChanged refreshing metadata'); | ||
self.client.refreshMetadata(self.topics, function (error) { | ||
self.client.refreshBrokerMetadata(function (error) { | ||
if (error) { | ||
@@ -118,0 +118,0 @@ self.emit('error', error); |
@@ -896,3 +896,3 @@ 'use strict'; | ||
if (!broker.isReady()) { | ||
logger.debug('missing apiSupport waiting until broker is ready...'); | ||
logger.debug('missing apiSupport waiting until broker is ready...(loadMetadataForTopics)'); | ||
this.waitUntilReady(broker, cb); | ||
@@ -997,4 +997,8 @@ } else { | ||
logger.debug('broker is now ready'); | ||
this._clearTimeout(timeoutId); | ||
timeoutId = null; | ||
if (timeoutId !== null) { | ||
this._clearTimeout(timeoutId); | ||
timeoutId = null; | ||
} | ||
callback(null); | ||
@@ -1006,7 +1010,9 @@ }; | ||
timeoutId = this._createTimeout(() => { | ||
this.removeListener(readyEventName, onReady); | ||
this._timeouts.delete(timeoutId); | ||
callback(new TimeoutError(`Request timed out after ${timeout}ms`)); | ||
}, timeout); | ||
if (timeout !== false) { | ||
timeoutId = this._createTimeout(() => { | ||
this.removeListener(readyEventName, onReady); | ||
this._timeouts.delete(timeoutId); | ||
callback(new TimeoutError(`Request timed out after ${timeout}ms`)); | ||
}, timeout); | ||
} | ||
@@ -1080,4 +1086,9 @@ this.once(readyEventName, onReady); | ||
const broker = this.brokerForLeader(leader, longpolling); | ||
if (!broker.isConnected()) { | ||
this.refreshBrokerMetadata(); | ||
callback(new errors.BrokerNotAvailableError('Broker not available (sendRequest -> ensureBrokerReady)')); | ||
return; | ||
} | ||
if (!broker.isReady()) { | ||
logger.debug('missing apiSupport waiting until broker is ready...'); | ||
logger.debug(`missing apiSupport waiting until broker is ready... (sendRequest ${request.type})`); | ||
this.waitUntilReady(broker, callback); | ||
@@ -1380,2 +1391,8 @@ } else { | ||
let err; | ||
// Broker resource requests must go to the specific node | ||
// other requests can go to any node | ||
const brokerResourceRequests = []; | ||
const nonBrokerResourceRequests = []; | ||
_.forEach(payload.resources, function (resource) { | ||
@@ -1388,37 +1405,45 @@ if (resourceTypeMap[resource.resourceType] === undefined) { | ||
} | ||
if (resource.resourceType === resourceTypeMap['broker']) { | ||
brokerResourceRequests.push(resource); | ||
} else { | ||
nonBrokerResourceRequests.push(resource); | ||
} | ||
}); | ||
if (err) { | ||
return callback(err); | ||
} | ||
const brokers = this.brokerMetadata; | ||
async.mapValuesLimit( | ||
brokers, | ||
this.options.maxAsyncRequests, | ||
(brokerMetadata, brokerId, cb) => { | ||
const broker = this.brokerForLeader(brokerId); | ||
if (!broker || !broker.isConnected()) { | ||
return cb(new errors.BrokerNotAvailableError('Broker not available (describeConfigs)')); | ||
} | ||
const correlationId = this.nextId(); | ||
let apiVersion = 0; | ||
if (broker.apiSupport && broker.apiSupport.describeConfigs) { | ||
apiVersion = broker.apiSupport.describeConfigs.max; | ||
async.parallelLimit([ | ||
(cb) => { | ||
if (nonBrokerResourceRequests.length > 0) { | ||
this.sendRequestToAnyBroker('describeConfigs', [{ resources: nonBrokerResourceRequests, includeSynonyms: payload.includeSynonyms }], cb); | ||
} else { | ||
cb(null, []); | ||
} | ||
apiVersion = Math.min(apiVersion, 2); | ||
const request = protocol.encodeDescribeConfigsRequest(this.clientId, correlationId, payload, apiVersion); | ||
this.sendWhenReady(broker, correlationId, request, protocol.decodeDescribeConfigsResponse(apiVersion), cb); | ||
}, | ||
(err, results) => { | ||
if (err) { | ||
callback(err); | ||
return; | ||
} | ||
results = _.values(results); | ||
callback(null, _.merge.apply({}, results)); | ||
...brokerResourceRequests.map(r => { | ||
return (cb) => { | ||
this.sendRequestToBroker(r.resourceName, 'describeConfigs', [{ resources: [r], includeSynonyms: payload.includeSynonyms }], cb); | ||
}; | ||
}) | ||
], this.options.maxAsyncRequests, (err, result) => { | ||
if (err) { | ||
return callback(err); | ||
} | ||
); | ||
callback(null, _.flatten(result)); | ||
}); | ||
}; | ||
/** | ||
* Sends a request to any broker in the cluster | ||
*/ | ||
KafkaClient.prototype.sendRequestToAnyBroker = function (requestType, args, callback) { | ||
// For now just select the first broker | ||
const brokerId = Object.keys(this.brokerMetadata)[0]; | ||
this.sendRequestToBroker(brokerId, requestType, args, callback); | ||
}; | ||
module.exports = KafkaClient; |
@@ -1671,3 +1671,15 @@ 'use strict'; | ||
function encodeDescribeConfigsRequest (clientId, correlationId, payload, apiVersion) { | ||
function encodeDescribeConfigsRequest (clientId, correlationId, payload) { | ||
return _encodeDescribeConfigsRequest(clientId, correlationId, payload, 0); | ||
} | ||
function encodeDescribeConfigsRequestV1 (clientId, correlationId, payload) { | ||
return _encodeDescribeConfigsRequest(clientId, correlationId, payload, 1); | ||
} | ||
function encodeDescribeConfigsRequestV2 (clientId, correlationId, payload) { | ||
return _encodeDescribeConfigsRequest(clientId, correlationId, payload, 2); | ||
} | ||
function _encodeDescribeConfigsRequest (clientId, correlationId, payload, apiVersion) { | ||
let request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.describeConfigs, apiVersion); | ||
@@ -1696,8 +1708,14 @@ const resources = payload.resources; | ||
function decodeDescribeConfigsResponse (apiVersion) { | ||
return function (resp) { | ||
return _decodeDescribeConfigsResponse(resp, apiVersion); | ||
}; | ||
function decodeDescribeConfigsResponse (resp) { | ||
return _decodeDescribeConfigsResponse(resp, 0); | ||
} | ||
function decodeDescribeConfigsResponseV1 (resp) { | ||
return _decodeDescribeConfigsResponse(resp, 1); | ||
} | ||
function decodeDescribeConfigsResponseV2 (resp) { | ||
return _decodeDescribeConfigsResponse(resp, 2); | ||
} | ||
function _decodeDescribeConfigsResponse (resp, apiVersion) { | ||
@@ -1863,2 +1881,6 @@ let resources = []; | ||
exports.encodeDescribeConfigsRequest = encodeDescribeConfigsRequest; | ||
exports.encodeDescribeConfigsRequestV1 = encodeDescribeConfigsRequestV1; | ||
exports.encodeDescribeConfigsRequestV2 = encodeDescribeConfigsRequestV2; | ||
exports.decodeDescribeConfigsResponse = decodeDescribeConfigsResponse; | ||
exports.decodeDescribeConfigsResponseV1 = decodeDescribeConfigsResponseV1; | ||
exports.decodeDescribeConfigsResponseV2 = decodeDescribeConfigsResponseV2; |
@@ -53,3 +53,7 @@ 'use strict'; | ||
deleteTopics: null, | ||
describeConfigs: [[p.encodeDescribeConfigsRequest, p.decodeDescribeConfigsResponse]], | ||
describeConfigs: [ | ||
[p.encodeDescribeConfigsRequest, p.decodeDescribeConfigsResponse], | ||
[p.encodeDescribeConfigsRequestV1, p.decodeDescribeConfigsResponseV1], | ||
[p.encodeDescribeConfigsRequestV2, p.decodeDescribeConfigsResponseV2] | ||
], | ||
saslAuthenticate: [[p.encodeSaslAuthenticationRequest, p.decodeSaslAuthenticationResponse]] | ||
@@ -56,0 +60,0 @@ }; |
@@ -17,3 +17,3 @@ { | ||
"bugs": "https://github.com/SOHU-co/kafka-node/issues", | ||
"version": "4.1.3", | ||
"version": "5.0.0", | ||
"main": "kafka.js", | ||
@@ -38,3 +38,3 @@ "types": "types/index.d.ts", | ||
"engines": { | ||
"node": ">=6.4.0" | ||
"node": ">=8.5.1" | ||
}, | ||
@@ -41,0 +41,0 @@ "optionalDependencies": { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Native code
Supply chain riskContains native code (e.g., compiled binaries or shared libraries). Including native code can obscure malicious behavior.
Found 1 instance in 1 package
6975
2
305494
62