Comparing version 1.11.0-beta.0 to 1.11.0-beta.1
{ | ||
"name": "kafkajs", | ||
"version": "1.11.0-beta.0", | ||
"version": "1.11.0-beta.1", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -81,5 +81,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"kafkajs": { | ||
"sha": "da9dc091b0af0506cedbad1403403d021d246de8", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.10.0...da9dc091b0af0506cedbad1403403d021d246de8" | ||
"sha": "263ae478bc9bb91b904c3144aa16003dc0720cdd", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.10.0...263ae478bc9bb91b904c3144aa16003dc0720cdd" | ||
} | ||
} |
@@ -8,2 +8,6 @@ const Broker = require('../broker') | ||
const { keys, assign, values } = Object | ||
const hasBrokerBeenReplaced = (broker, { host, port, rack }) => | ||
broker.connection.host !== host || | ||
broker.connection.port !== port || | ||
broker.connection.rack !== rack | ||
@@ -124,5 +128,10 @@ module.exports = class BrokerPool { | ||
const replacedBrokers = [] | ||
this.brokers = this.metadata.brokers.reduce((result, { nodeId, host, port, rack }) => { | ||
if (result[nodeId]) { | ||
return result | ||
if (!hasBrokerBeenReplaced(result[nodeId], { host, port, rack })) { | ||
return result | ||
} | ||
replacedBrokers.push(result[nodeId]) | ||
} | ||
@@ -132,2 +141,3 @@ | ||
this.seedBroker.nodeId = nodeId | ||
this.seedBroker.connection.rack = rack | ||
return assign(result, { | ||
@@ -160,3 +170,4 @@ [nodeId]: this.seedBroker, | ||
await Promise.all(brokerDisconnects) | ||
const replacedBrokersDisconnects = replacedBrokers.map(broker => broker.disconnect()) | ||
await Promise.all([...brokerDisconnects, ...replacedBrokersDisconnects]) | ||
} catch (e) { | ||
@@ -163,0 +174,0 @@ if (e.type === 'LEADER_NOT_AVAILABLE') { |
470906
13995