Comparing version
{ | ||
"name": "kafkajs", | ||
"version": "1.14.0-beta.8", | ||
"version": "1.14.0-beta.9", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -83,5 +83,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"kafkajs": { | ||
"sha": "7614eeeec75af3be3deb5f986e92f96ba0aeee82", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.13.0...7614eeeec75af3be3deb5f986e92f96ba0aeee82" | ||
"sha": "6c553c855e42688f196aaf4b9054750ab7d3f2b9", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.13.0...6c553c855e42688f196aaf4b9054750ab7d3f2b9" | ||
} | ||
} |
@@ -45,7 +45,2 @@ const Broker = require('../broker') | ||
this.seedBroker = this.createBroker({ | ||
connection: this.connectionBuilder.build(), | ||
logger: this.rootLogger, | ||
}) | ||
this.brokers = {} | ||
@@ -64,5 +59,19 @@ this.metadata = null | ||
const brokers = values(this.brokers) | ||
return !!brokers.find(broker => broker.isConnected()) || this.seedBroker.isConnected() | ||
return ( | ||
!!brokers.find(broker => broker.isConnected()) || | ||
(this.seedBroker ? this.seedBroker.isConnected() : false) | ||
) | ||
} | ||
async createSeedBroker() { | ||
if (this.seedBroker) { | ||
await this.seedBroker.disconnect() | ||
} | ||
this.seedBroker = this.createBroker({ | ||
connection: await this.connectionBuilder.build(), | ||
logger: this.rootLogger, | ||
}) | ||
} | ||
/** | ||
@@ -77,2 +86,6 @@ * @public | ||
if (!this.seedBroker) { | ||
await this.createSeedBroker() | ||
} | ||
return this.retrier(async (bail, retryCount, retryTime) => { | ||
@@ -85,6 +98,3 @@ try { | ||
// Connection builder will always rotate the seed broker | ||
this.seedBroker = this.createBroker({ | ||
connection: this.connectionBuilder.build(), | ||
logger: this.rootLogger, | ||
}) | ||
await this.createSeedBroker() | ||
this.logger.error( | ||
@@ -109,3 +119,3 @@ `Failed to connect to seed broker, trying another broker from the list: ${e.message}`, | ||
async disconnect() { | ||
await this.seedBroker.disconnect() | ||
this.seedBroker && (await this.seedBroker.disconnect()) | ||
await Promise.all(values(this.brokers).map(broker => broker.disconnect())) | ||
@@ -154,30 +164,36 @@ | ||
const replacedBrokers = [] | ||
this.brokers = this.metadata.brokers.reduce((result, { nodeId, host, port, rack }) => { | ||
if (result[nodeId]) { | ||
if (!hasBrokerBeenReplaced(result[nodeId], { host, port, rack })) { | ||
return result | ||
this.brokers = await this.metadata.brokers.reduce( | ||
async (resultPromise, { nodeId, host, port, rack }) => { | ||
const result = await resultPromise | ||
if (result[nodeId]) { | ||
if (!hasBrokerBeenReplaced(result[nodeId], { host, port, rack })) { | ||
return result | ||
} | ||
replacedBrokers.push(result[nodeId]) | ||
} | ||
replacedBrokers.push(result[nodeId]) | ||
} | ||
if (host === seedHost && port === seedPort) { | ||
this.seedBroker.nodeId = nodeId | ||
this.seedBroker.connection.rack = rack | ||
return assign(result, { | ||
[nodeId]: this.seedBroker, | ||
}) | ||
} | ||
if (host === seedHost && port === seedPort) { | ||
this.seedBroker.nodeId = nodeId | ||
this.seedBroker.connection.rack = rack | ||
return assign(result, { | ||
[nodeId]: this.seedBroker, | ||
[nodeId]: this.createBroker({ | ||
logger: this.rootLogger, | ||
versions: this.versions, | ||
supportAuthenticationProtocol: this.supportAuthenticationProtocol, | ||
connection: await this.connectionBuilder.build({ host, port, rack }), | ||
nodeId, | ||
}), | ||
}) | ||
} | ||
}, | ||
this.brokers | ||
) | ||
return assign(result, { | ||
[nodeId]: this.createBroker({ | ||
logger: this.rootLogger, | ||
versions: this.versions, | ||
supportAuthenticationProtocol: this.supportAuthenticationProtocol, | ||
connection: this.connectionBuilder.build({ host, port, rack }), | ||
nodeId, | ||
}), | ||
}) | ||
}, this.brokers) | ||
const freshBrokerIds = this.metadata.brokers.map(({ nodeId }) => `${nodeId}`).sort() | ||
@@ -312,3 +328,3 @@ const currentBrokerIds = keys(this.brokers).sort() | ||
// Rebuild the connection since it can't recover from illegal SASL state | ||
broker.connection = this.connectionBuilder.build({ | ||
broker.connection = await this.connectionBuilder.build({ | ||
host: broker.connection.host, | ||
@@ -315,0 +331,0 @@ port: broker.connection.port, |
const Connection = require('../network/connection') | ||
const { KafkaJSNonRetriableError } = require('../errors') | ||
const shuffle = require('../utils/shuffle') | ||
const { KafkaJSConnectionError, KafkaJSNonRetriableError } = require('../errors') | ||
const validateBrokers = brokers => { | ||
if (!brokers || brokers.length === 0) { | ||
throw new KafkaJSNonRetriableError(`Failed to connect: expected brokers array and got nothing`) | ||
} | ||
} | ||
module.exports = ({ | ||
@@ -25,15 +18,41 @@ socketFactory, | ||
}) => { | ||
validateBrokers(brokers) | ||
const shuffledBrokers = shuffle(brokers) | ||
const size = brokers.length | ||
let index = 0 | ||
const getBrokers = async () => { | ||
if (!brokers) { | ||
throw new KafkaJSNonRetriableError(`Failed to connect: brokers parameter should not be null`) | ||
} | ||
// static list | ||
if (Array.isArray(brokers)) { | ||
if (!brokers.length) { | ||
throw new KafkaJSNonRetriableError(`Failed to connect: brokers array is empty`) | ||
} | ||
return brokers | ||
} | ||
// dynamic brokers | ||
let list | ||
try { | ||
list = await brokers() | ||
} catch (e) { | ||
logger.error(e) | ||
throw new KafkaJSConnectionError(`Failed to connect: brokers function returned exception`) | ||
} | ||
if (!list || list.length === 0) { | ||
throw new KafkaJSConnectionError(`Failed to connect: brokers function returned nothing`) | ||
} | ||
return list | ||
} | ||
return { | ||
build: ({ host, port, rack } = {}) => { | ||
build: async ({ host, port, rack } = {}) => { | ||
if (!host) { | ||
// Always rotate the seed broker | ||
const [seedHost, seedPort] = shuffledBrokers[index++ % size].split(':') | ||
host = seedHost | ||
port = Number(seedPort) | ||
const list = await getBrokers() | ||
const randomBroker = list[index++ % list.length] | ||
host = randomBroker.split(':')[0] | ||
port = Number(randomBroker.split(':')[1]) | ||
} | ||
@@ -45,4 +64,4 @@ | ||
rack, | ||
sasl, | ||
ssl, | ||
sasl, | ||
clientId, | ||
@@ -49,0 +68,0 @@ socketFactory, |
@@ -17,4 +17,6 @@ /// <reference types="node" /> | ||
export type BrokersFunction = () => string[] | Promise<string[]>; | ||
export interface KafkaConfig { | ||
brokers: string[] | ||
brokers: string[] | BrokersFunction | ||
ssl?: tls.ConnectionOptions | boolean | ||
@@ -21,0 +23,0 @@ sasl?: SASLOptions |
578108
0.15%17026
0.16%