Comparing version 1.15.0-beta.24 to 1.15.0-beta.25
{ | ||
"name": "kafkajs", | ||
"version": "1.15.0-beta.24", | ||
"version": "1.15.0-beta.25", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -83,5 +83,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"kafkajs": { | ||
"sha": "b83c14b332a61716fc3553ec293ee6c34dd46d22", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.14.0...b83c14b332a61716fc3553ec293ee6c34dd46d22" | ||
"sha": "d13acd58f693bef5f2460ade036757d6314e4710", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.14.0...d13acd58f693bef5f2460ade036757d6314e4710" | ||
} | ||
} |
@@ -5,3 +5,3 @@ const Broker = require('../broker') | ||
const arrayDiff = require('../utils/arrayDiff') | ||
const { KafkaJSBrokerNotFound } = require('../errors') | ||
const { KafkaJSBrokerNotFound, KafkaJSProtocolError } = require('../errors') | ||
@@ -315,9 +315,11 @@ const { keys, assign, values } = Object | ||
await broker.disconnect() | ||
} | ||
// Connection refused means this node is down, or the cluster is restarting, | ||
// which requires metadata refresh to discover the new nodes | ||
if (e.code === 'ECONNREFUSED') { | ||
return bail(e) | ||
} | ||
// To avoid reconnecting to an unavailable host, we bail on connection errors | ||
// and refresh metadata on a higher level before reconnecting | ||
if (e.name === 'KafkaJSConnectionError') { | ||
return bail(e) | ||
} | ||
if (e.type === 'ILLEGAL_SASL_STATE') { | ||
// Rebuild the connection since it can't recover from illegal SASL state | ||
@@ -331,2 +333,3 @@ broker.connection = await this.connectionBuilder.build({ | ||
this.logger.error(`Failed to connect to broker, reconnecting`, { retryCount, retryTime }) | ||
throw new KafkaJSProtocolError(e, { retriable: true }) | ||
} | ||
@@ -333,0 +336,0 @@ |
@@ -219,3 +219,3 @@ const BrokerPool = require('./brokerPool') | ||
e.name === 'KafkaJSLockTimeout' || | ||
e.code === 'ECONNREFUSED' | ||
e.name === 'KafkaJSConnectionError' | ||
) { | ||
@@ -222,0 +222,0 @@ await this.refreshMetadata() |
@@ -22,4 +22,4 @@ const pkgJson = require('../package.json') | ||
class KafkaJSProtocolError extends KafkaJSError { | ||
constructor(e) { | ||
super(e, { retriable: e.retriable }) | ||
constructor(e, { retriable = e.retriable } = {}) { | ||
super(e, { retriable }) | ||
this.type = e.type | ||
@@ -26,0 +26,0 @@ this.code = e.code |
@@ -17,2 +17,3 @@ const createSendMessages = require('./sendMessages') | ||
cluster, | ||
retrier, | ||
partitioner, | ||
@@ -95,41 +96,7 @@ eosManager, | ||
return retrier(async (bail, retryCount, retryTime) => { | ||
try { | ||
return await sendMessages({ | ||
acks, | ||
timeout, | ||
compression, | ||
topicMessages: mergedTopicMessages, | ||
}) | ||
} catch (error) { | ||
if (error.name === 'KafkaJSConnectionClosedError') { | ||
cluster.removeBroker({ host: error.host, port: error.port }) | ||
} | ||
if (!cluster.isConnected()) { | ||
logger.debug(`Cluster has disconnected, reconnecting: ${error.message}`, { | ||
retryCount, | ||
retryTime, | ||
}) | ||
await cluster.connect() | ||
await cluster.refreshMetadata() | ||
throw error | ||
} | ||
// This is necessary in case the metadata is stale and the number of partitions | ||
// for this topic has increased in the meantime | ||
if ( | ||
error.name === 'KafkaJSConnectionError' || | ||
error.name === 'KafkaJSConnectionClosedError' || | ||
(error.name === 'KafkaJSProtocolError' && error.retriable) | ||
) { | ||
logger.error(`Failed to send messages: ${error.message}`, { retryCount, retryTime }) | ||
await cluster.refreshMetadata() | ||
throw error | ||
} | ||
// Skip retries for errors not related to the Kafka protocol | ||
logger.error(`${error.message}`, { retryCount, retryTime }) | ||
bail(error) | ||
} | ||
return await sendMessages({ | ||
acks, | ||
timeout, | ||
compression, | ||
topicMessages: mergedTopicMessages, | ||
}) | ||
@@ -136,0 +103,0 @@ } |
@@ -1,2 +0,1 @@ | ||
const createRetry = require('../retry') | ||
const flatten = require('../utils/flatten') | ||
@@ -10,13 +9,7 @@ const { KafkaJSMetadataNotLoaded } = require('../errors') | ||
const { keys } = Object | ||
const TOTAL_INDIVIDUAL_ATTEMPTS = 5 | ||
module.exports = ({ logger, cluster, partitioner, eosManager }) => { | ||
const retrier = createRetry({ retries: TOTAL_INDIVIDUAL_ATTEMPTS }) | ||
module.exports = ({ logger, cluster, partitioner, eosManager, retrier }) => { | ||
return async ({ acks, timeout, compression, topicMessages }) => { | ||
const responsePerBroker = new Map() | ||
const topics = topicMessages.map(({ topic }) => topic) | ||
await cluster.addMultipleTargetTopics(topics) | ||
const createProducerRequests = async responsePerBroker => { | ||
@@ -119,3 +112,6 @@ const topicMetadata = new Map() | ||
const makeRequests = async (bail, retryCount, retryTime) => { | ||
return retrier(async (bail, retryCount, retryTime) => { | ||
const topics = topicMessages.map(({ topic }) => topic) | ||
await cluster.addMultipleTargetTopics(topics) | ||
try { | ||
@@ -127,14 +123,36 @@ const requests = await createProducerRequests(responsePerBroker) | ||
} catch (e) { | ||
if (staleMetadata(e) || e.name === 'KafkaJSMetadataNotLoaded') { | ||
if (e.name === 'KafkaJSConnectionClosedError') { | ||
cluster.removeBroker({ host: e.host, port: e.port }) | ||
} | ||
if (!cluster.isConnected()) { | ||
logger.debug(`Cluster has disconnected, reconnecting: ${e.message}`, { | ||
retryCount, | ||
retryTime, | ||
}) | ||
await cluster.connect() | ||
await cluster.refreshMetadata() | ||
throw e | ||
} | ||
throw e | ||
// This is necessary in case the metadata is stale and the number of partitions | ||
// for this topic has increased in the meantime | ||
if ( | ||
staleMetadata(e) || | ||
e.name === 'KafkaJSMetadataNotLoaded' || | ||
e.name === 'KafkaJSConnectionError' || | ||
e.name === 'KafkaJSConnectionClosedError' || | ||
(e.name === 'KafkaJSProtocolError' && e.retriable) | ||
) { | ||
logger.error(`Failed to send messages: ${e.message}`, { retryCount, retryTime }) | ||
await cluster.refreshMetadata() | ||
throw e | ||
} | ||
logger.error(`${e.message}`, { retryCount, retryTime }) | ||
if (e.retriable) throw e | ||
bail(e) | ||
} | ||
} | ||
return retrier(makeRequests).catch(e => { | ||
throw e.originalError || e | ||
}) | ||
} | ||
} |
670874
19880