kafka-node
Advanced tools
Comparing version 2.3.2 to 2.4.0
# kafka-node CHANGELOG | ||
## 2018-1-16, Version 2.3.2 | ||
## 2018-02-06, Version 2.4.0 | ||
* Add compability to `Client` with broker configurations that separates external and internal traffic [#860](https://github.com/SOHU-Co/kafka-node/pull/860) | ||
* Fix issue where `updateMetadata()` wipes out entire topic metadata when only updating specific topics [#857](https://github.com/SOHU-Co/kafka-node/pull/857) | ||
## 2018-01-16, Version 2.3.2 | ||
* Fix issue with `ConsumerGroupStream` where lag stays at one and resuming the consumer re-reads the last read message [#850](https://github.com/SOHU-Co/kafka-node/pull/850) | ||
## 2018-1-7, Version 2.3.1 | ||
## 2018-01-7, Version 2.3.1 | ||
* Fix consumer example [#842](https://github.com/SOHU-Co/kafka-node/pull/842) | ||
@@ -9,0 +14,0 @@ * Fix issue where ConsumerGroupStream will autoCommit when the stream is explicitly closed [#843](https://github.com/SOHU-Co/kafka-node/pull/843) |
@@ -129,3 +129,3 @@ 'use strict'; | ||
var self = this; | ||
var protocol = self.ssl ? 'ssl:' : 'plaintext:'; | ||
var protocol = self.ssl ? 'SSL' : 'PLAINTEXT'; | ||
@@ -138,3 +138,9 @@ Object.keys(brokers).forEach(function (key) { | ||
var endpoint = _.find(brokerProfile.endpoints, function (endpoint) { | ||
return url.parse(endpoint).protocol === protocol; | ||
var securityProtocolMap = brokerProfile.listener_security_protocol_map; | ||
var listenerName = url.parse(endpoint).protocol.replace(':', '').toUpperCase(); | ||
if (securityProtocolMap !== undefined) { | ||
return securityProtocolMap[listenerName] === protocol; | ||
} else { | ||
return listenerName === protocol; | ||
} | ||
}); | ||
@@ -141,0 +147,0 @@ |
@@ -141,3 +141,3 @@ 'use strict'; | ||
} | ||
this.updateMetadatas(result); | ||
this.updateMetadatas(result, true); | ||
this.ready = true; | ||
@@ -305,3 +305,3 @@ this.emit('ready'); | ||
} | ||
this.updateMetadatas(result); | ||
this.updateMetadatas(result, true); | ||
this.refreshBrokers(); | ||
@@ -333,7 +333,11 @@ callback(error); | ||
KafkaClient.prototype.updateMetadatas = function (metadatas) { | ||
KafkaClient.prototype.updateMetadatas = function (metadatas, replaceTopicMetadata) { | ||
assert(metadatas && Array.isArray(metadatas) && metadatas.length === 2, 'metadata format is incorrect'); | ||
logger.debug('updating metadatas'); | ||
this.setBrokerMetadata(metadatas[0]); | ||
this.topicMetadata = metadatas[1].metadata; | ||
if (replaceTopicMetadata) { | ||
this.topicMetadata = metadatas[1].metadata; | ||
} else { | ||
_.extend(this.topicMetadata, metadatas[1].metadata); | ||
} | ||
}; | ||
@@ -425,3 +429,5 @@ | ||
const brokers = this.brokerMetadata; | ||
async.mapValuesLimit(brokers, this.options.maxAsyncRequests, | ||
async.mapValuesLimit( | ||
brokers, | ||
this.options.maxAsyncRequests, | ||
(brokerMetadata, brokerId, cb) => { | ||
@@ -437,3 +443,4 @@ const broker = this.brokerForLeader(brokerId); | ||
broker.write(request); | ||
}, (err, results) => { | ||
}, | ||
(err, results) => { | ||
if (err) { | ||
@@ -456,3 +463,5 @@ callback(err); | ||
async.groupByLimit(groups, this.options.maxAsyncRequests, | ||
async.groupByLimit( | ||
groups, | ||
this.options.maxAsyncRequests, | ||
(group, cb) => { | ||
@@ -469,3 +478,5 @@ this.sendGroupCoordinatorRequest(group, (err, coordinator) => { | ||
async.mapValuesLimit(results, this.options.maxAsyncRequests, | ||
async.mapValuesLimit( | ||
results, | ||
this.options.maxAsyncRequests, | ||
(groups, coordinator, cb) => { | ||
@@ -487,11 +498,20 @@ const broker = this.brokerForLeader(coordinator); | ||
callback(null, _.reduce(res, (result, describes, broker) => { | ||
_.each(describes, (values, consumer) => { | ||
result[consumer] = values; | ||
result[consumer].brokerId = broker; | ||
}); | ||
return result; | ||
}, {})); | ||
}); | ||
}); | ||
callback( | ||
null, | ||
_.reduce( | ||
res, | ||
(result, describes, broker) => { | ||
_.each(describes, (values, consumer) => { | ||
result[consumer] = values; | ||
result[consumer].brokerId = broker; | ||
}); | ||
return result; | ||
}, | ||
{} | ||
) | ||
); | ||
} | ||
); | ||
} | ||
); | ||
}; | ||
@@ -498,0 +518,0 @@ |
@@ -17,3 +17,3 @@ { | ||
"bugs": "https://github.com/SOHU-co/kafka-node/issues", | ||
"version": "2.3.2", | ||
"version": "2.4.0", | ||
"main": "kafka.js", | ||
@@ -20,0 +20,0 @@ "license": "MIT", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
286823
6535