Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

kafka-node

Package Overview
Dependencies
Maintainers
3
Versions
113
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-node - npm Package Compare versions

Comparing version 4.1.3 to 5.0.0

8

CHANGELOG.md
# kafka-node CHANGELOG
## 2019-11-04, Version 5.0.0
* Fix describe configs for multiple brokers [#1280](https://github.com/SOHU-Co/kafka-node/pull/1280)
* Fix requestTimeout bug [#1255](https://github.com/SOHU-Co/kafka-node/pull/1255)
* Improve consumer recovering from stalling when cluster redeploys [#1345](https://github.com/SOHU-Co/kafka-node/pull/1345)
### BREAKING CHANGE
* Dropped support for node 6
## 2019-04-30, Version 4.1.3

@@ -4,0 +12,0 @@ * Fix parseHost returning a string port instead of a number [#1257](https://github.com/SOHU-Co/kafka-node/pull/1257)

2

lib/consumerGroup.js

@@ -115,3 +115,3 @@ 'use strict';

logger.debug('brokersChanged refreshing metadata');
self.client.refreshMetadata(self.topics, function (error) {
self.client.refreshBrokerMetadata(function (error) {
if (error) {

@@ -118,0 +118,0 @@ self.emit('error', error);

@@ -896,3 +896,3 @@ 'use strict';

if (!broker.isReady()) {
logger.debug('missing apiSupport waiting until broker is ready...');
logger.debug('missing apiSupport waiting until broker is ready...(loadMetadataForTopics)');
this.waitUntilReady(broker, cb);

@@ -997,4 +997,8 @@ } else {

logger.debug('broker is now ready');
this._clearTimeout(timeoutId);
timeoutId = null;
if (timeoutId !== null) {
this._clearTimeout(timeoutId);
timeoutId = null;
}
callback(null);

@@ -1006,7 +1010,9 @@ };

timeoutId = this._createTimeout(() => {
this.removeListener(readyEventName, onReady);
this._timeouts.delete(timeoutId);
callback(new TimeoutError(`Request timed out after ${timeout}ms`));
}, timeout);
if (timeout !== false) {
timeoutId = this._createTimeout(() => {
this.removeListener(readyEventName, onReady);
this._timeouts.delete(timeoutId);
callback(new TimeoutError(`Request timed out after ${timeout}ms`));
}, timeout);
}

@@ -1080,4 +1086,9 @@ this.once(readyEventName, onReady);

const broker = this.brokerForLeader(leader, longpolling);
if (!broker.isConnected()) {
this.refreshBrokerMetadata();
callback(new errors.BrokerNotAvailableError('Broker not available (sendRequest -> ensureBrokerReady)'));
return;
}
if (!broker.isReady()) {
logger.debug('missing apiSupport waiting until broker is ready...');
logger.debug(`missing apiSupport waiting until broker is ready... (sendRequest ${request.type})`);
this.waitUntilReady(broker, callback);

@@ -1380,2 +1391,8 @@ } else {

let err;
// Broker resource requests must go to the specific node
// other requests can go to any node
const brokerResourceRequests = [];
const nonBrokerResourceRequests = [];
_.forEach(payload.resources, function (resource) {

@@ -1388,37 +1405,45 @@ if (resourceTypeMap[resource.resourceType] === undefined) {

}
if (resource.resourceType === resourceTypeMap['broker']) {
brokerResourceRequests.push(resource);
} else {
nonBrokerResourceRequests.push(resource);
}
});
if (err) {
return callback(err);
}
const brokers = this.brokerMetadata;
async.mapValuesLimit(
brokers,
this.options.maxAsyncRequests,
(brokerMetadata, brokerId, cb) => {
const broker = this.brokerForLeader(brokerId);
if (!broker || !broker.isConnected()) {
return cb(new errors.BrokerNotAvailableError('Broker not available (describeConfigs)'));
}
const correlationId = this.nextId();
let apiVersion = 0;
if (broker.apiSupport && broker.apiSupport.describeConfigs) {
apiVersion = broker.apiSupport.describeConfigs.max;
async.parallelLimit([
(cb) => {
if (nonBrokerResourceRequests.length > 0) {
this.sendRequestToAnyBroker('describeConfigs', [{ resources: nonBrokerResourceRequests, includeSynonyms: payload.includeSynonyms }], cb);
} else {
cb(null, []);
}
apiVersion = Math.min(apiVersion, 2);
const request = protocol.encodeDescribeConfigsRequest(this.clientId, correlationId, payload, apiVersion);
this.sendWhenReady(broker, correlationId, request, protocol.decodeDescribeConfigsResponse(apiVersion), cb);
},
(err, results) => {
if (err) {
callback(err);
return;
}
results = _.values(results);
callback(null, _.merge.apply({}, results));
...brokerResourceRequests.map(r => {
return (cb) => {
this.sendRequestToBroker(r.resourceName, 'describeConfigs', [{ resources: [r], includeSynonyms: payload.includeSynonyms }], cb);
};
})
], this.options.maxAsyncRequests, (err, result) => {
if (err) {
return callback(err);
}
);
callback(null, _.flatten(result));
});
};
/**
* Sends a request to any broker in the cluster
*/
KafkaClient.prototype.sendRequestToAnyBroker = function (requestType, args, callback) {
// For now just select the first broker
const brokerId = Object.keys(this.brokerMetadata)[0];
this.sendRequestToBroker(brokerId, requestType, args, callback);
};
module.exports = KafkaClient;

@@ -1671,3 +1671,15 @@ 'use strict';

function encodeDescribeConfigsRequest (clientId, correlationId, payload, apiVersion) {
function encodeDescribeConfigsRequest (clientId, correlationId, payload) {
return _encodeDescribeConfigsRequest(clientId, correlationId, payload, 0);
}
function encodeDescribeConfigsRequestV1 (clientId, correlationId, payload) {
return _encodeDescribeConfigsRequest(clientId, correlationId, payload, 1);
}
function encodeDescribeConfigsRequestV2 (clientId, correlationId, payload) {
return _encodeDescribeConfigsRequest(clientId, correlationId, payload, 2);
}
function _encodeDescribeConfigsRequest (clientId, correlationId, payload, apiVersion) {
let request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.describeConfigs, apiVersion);

@@ -1696,8 +1708,14 @@ const resources = payload.resources;

function decodeDescribeConfigsResponse (apiVersion) {
return function (resp) {
return _decodeDescribeConfigsResponse(resp, apiVersion);
};
function decodeDescribeConfigsResponse (resp) {
return _decodeDescribeConfigsResponse(resp, 0);
}
function decodeDescribeConfigsResponseV1 (resp) {
return _decodeDescribeConfigsResponse(resp, 1);
}
function decodeDescribeConfigsResponseV2 (resp) {
return _decodeDescribeConfigsResponse(resp, 2);
}
function _decodeDescribeConfigsResponse (resp, apiVersion) {

@@ -1863,2 +1881,6 @@ let resources = [];

exports.encodeDescribeConfigsRequest = encodeDescribeConfigsRequest;
exports.encodeDescribeConfigsRequestV1 = encodeDescribeConfigsRequestV1;
exports.encodeDescribeConfigsRequestV2 = encodeDescribeConfigsRequestV2;
exports.decodeDescribeConfigsResponse = decodeDescribeConfigsResponse;
exports.decodeDescribeConfigsResponseV1 = decodeDescribeConfigsResponseV1;
exports.decodeDescribeConfigsResponseV2 = decodeDescribeConfigsResponseV2;

@@ -53,3 +53,7 @@ 'use strict';

deleteTopics: null,
describeConfigs: [[p.encodeDescribeConfigsRequest, p.decodeDescribeConfigsResponse]],
describeConfigs: [
[p.encodeDescribeConfigsRequest, p.decodeDescribeConfigsResponse],
[p.encodeDescribeConfigsRequestV1, p.decodeDescribeConfigsResponseV1],
[p.encodeDescribeConfigsRequestV2, p.decodeDescribeConfigsResponseV2]
],
saslAuthenticate: [[p.encodeSaslAuthenticationRequest, p.decodeSaslAuthenticationResponse]]

@@ -56,0 +60,0 @@ };

@@ -17,3 +17,3 @@ {

"bugs": "https://github.com/SOHU-co/kafka-node/issues",
"version": "4.1.3",
"version": "5.0.0",
"main": "kafka.js",

@@ -38,3 +38,3 @@ "types": "types/index.d.ts",

"engines": {
"node": ">=6.4.0"
"node": ">=8.5.1"
},

@@ -41,0 +41,0 @@ "optionalDependencies": {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc