Comparing version 0.6.2 to 0.6.3
@@ -8,2 +8,9 @@ # Changelog | ||
## [0.6.3] - 2017-12-11 | ||
### Added | ||
- Add error logs for user errors in `eachMessage` and `eachBatch` | ||
### Fixed | ||
- Recover from rebalance in progress when starting the consumer | ||
## [0.6.2] - 2017-12-11 | ||
@@ -10,0 +17,0 @@ ### Added |
{ | ||
"name": "kafkajs", | ||
"version": "0.6.2", | ||
"version": "0.6.3", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -5,0 +5,0 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", |
const createRetry = require('../retry') | ||
const { KafkaJSError } = require('../errors') | ||
const isRebalancing = e => | ||
e.type === 'REBALANCE_IN_PROGRESS' || e.type === 'NOT_COORDINATOR_FOR_GROUP' | ||
module.exports = class Runner { | ||
@@ -26,4 +30,18 @@ constructor({ | ||
async join() { | ||
await this.consumerGroup.join() | ||
await this.consumerGroup.sync() | ||
return this.retrier(async (bail, retryCount, retryTime) => { | ||
try { | ||
await this.consumerGroup.join() | ||
await this.consumerGroup.sync() | ||
} catch (e) { | ||
if (isRebalancing(e)) { | ||
// Rebalance in progress isn't a retriable error since the consumer | ||
// has to go through find coordinator and join again before it can | ||
// actually retry. Throwing a retriable error to allow the retrier | ||
// to keep going | ||
throw new KafkaJSError('The group is rebalancing') | ||
} | ||
bail(e) | ||
} | ||
}) | ||
} | ||
@@ -89,2 +107,3 @@ | ||
} catch (e) { | ||
this.logger.error(`Error when calling eachMessage`, { stack: e.stack }) | ||
// In case of errors, commit the previously consumed offsets | ||
@@ -115,2 +134,3 @@ await this.consumerGroup.commitOffsets() | ||
} catch (e) { | ||
this.logger.error(`Error when calling eachBatch`, { stack: e.stack }) | ||
// eachBatch has a special resolveOffset which can be used | ||
@@ -175,3 +195,3 @@ // to keep track of the messages | ||
if (e.type === 'REBALANCE_IN_PROGRESS' || e.type === 'NOT_COORDINATOR_FOR_GROUP') { | ||
if (isRebalancing(e)) { | ||
this.logger.error('The group is rebalancing, re-joining', { | ||
@@ -178,0 +198,0 @@ groupId: this.consumerGroup.groupId, |
271280
5104