Comparing version 1.5.0-beta.3 to 1.5.0-beta.4
@@ -8,2 +8,7 @@ # Changelog | ||
## [1.5.0-beta.4] - 2019-02-28 | ||
### Fixed | ||
- Abort old transactions on protocol error `CONCURRENT_TRANSACTIONS` #299 | ||
## [1.5.0-beta.3] - 2019-02-20 | ||
@@ -14,3 +19,3 @@ | ||
- Add custom requestTimeout for JoinGroup v0 #293 | ||
- Fix propagation of custom retry configs #295 | ||
- Fix propagation of custom retry configs #295 | ||
@@ -17,0 +22,0 @@ ### Changed |
{ | ||
"name": "kafkajs", | ||
"version": "1.5.0-beta.3", | ||
"version": "1.5.0-beta.4", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -5,0 +5,0 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", |
const flatten = require('../utils/flatten') | ||
const sleep = require('../utils/sleep') | ||
const websiteUrl = require('../utils/websiteUrl') | ||
const arrayDiff = require('../utils/arrayDiff') | ||
@@ -172,2 +173,6 @@ const OffsetManager = require('./offsetManager') | ||
topicsNotSubscribed, | ||
helpUrl: websiteUrl( | ||
'docs/faq', | ||
'why-am-i-receiving-messages-for-topics-i-m-not-subscribed-to' | ||
), | ||
}) | ||
@@ -174,0 +179,0 @@ |
@@ -0,1 +1,2 @@ | ||
const createRetry = require('../../retry') | ||
const { KafkaJSNonRetriableError } = require('../../errors') | ||
@@ -10,2 +11,13 @@ const COORDINATOR_TYPES = require('../../protocol/coordinatorTypes') | ||
const INT_32_MAX_VALUE = Math.pow(2, 32) | ||
const INIT_PRODUCER_RETRIABLE_PROTOCOL_ERRORS = [ | ||
'NOT_COORDINATOR_FOR_GROUP', | ||
'GROUP_COORDINATOR_NOT_AVAILABLE', | ||
'GROUP_LOAD_IN_PROGRESS', | ||
/** | ||
* The producer might have crashed and never committed the transaction; retry the | ||
* request so Kafka can abort the current transaction | ||
* @see https://github.com/apache/kafka/blob/201da0542726472d954080d54bc585b111aaf86f/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1001-L1002 | ||
*/ | ||
'CONCURRENT_TRANSACTIONS', | ||
] | ||
@@ -26,2 +38,4 @@ /** | ||
const retrier = createRetry(cluster.retry) | ||
/** | ||
@@ -95,21 +109,41 @@ * Current producer ID | ||
*/ | ||
initProducerId: async () => { | ||
await cluster.refreshMetadataIfNecessary() | ||
async initProducerId() { | ||
return retrier(async (bail, retryCount, retryTime) => { | ||
try { | ||
await cluster.refreshMetadataIfNecessary() | ||
// If non-transactional we can request the PID from any broker | ||
const broker = await (transactional | ||
? findTransactionCoordinator() | ||
: cluster.findControllerBroker()) | ||
// If non-transactional we can request the PID from any broker | ||
const broker = await (transactional | ||
? findTransactionCoordinator() | ||
: cluster.findControllerBroker()) | ||
const result = await broker.initProducerId({ | ||
transactionalId: transactional ? transactionalId : undefined, | ||
transactionTimeout, | ||
}) | ||
const result = await broker.initProducerId({ | ||
transactionalId: transactional ? transactionalId : undefined, | ||
transactionTimeout, | ||
}) | ||
stateMachine.transitionTo(STATES.READY) | ||
producerId = result.producerId | ||
producerEpoch = result.producerEpoch | ||
producerSequence = {} | ||
stateMachine.transitionTo(STATES.READY) | ||
producerId = result.producerId | ||
producerEpoch = result.producerEpoch | ||
producerSequence = {} | ||
logger.debug('Initialized producer id & epoch', { producerId, producerEpoch }) | ||
logger.debug('Initialized producer id & epoch', { producerId, producerEpoch }) | ||
} catch (e) { | ||
if (INIT_PRODUCER_RETRIABLE_PROTOCOL_ERRORS.includes(e.type)) { | ||
if (e.type === 'CONCURRENT_TRANSACTIONS') { | ||
logger.debug('There is an ongoing transaction on this transactionId, retrying', { | ||
error: e.message, | ||
stack: e.stack, | ||
transactionalId, | ||
retryCount, | ||
retryTime, | ||
}) | ||
} | ||
throw e | ||
} | ||
bail(e) | ||
} | ||
}) | ||
}, | ||
@@ -309,2 +343,3 @@ | ||
}, | ||
/** | ||
@@ -311,0 +346,0 @@ * Transaction state guards |
383784
217
11345