Comparing version 2.1.0-beta.7 to 2.1.0-beta.8
{ | ||
"name": "kafkajs", | ||
"version": "2.1.0-beta.7", | ||
"version": "2.1.0-beta.8", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -85,5 +85,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"kafkajs": { | ||
"sha": "ddf4f64923245ce2cf5716d5babd7e05eb890030", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v2.0.2...ddf4f64923245ce2cf5716d5babd7e05eb890030" | ||
"sha": "51a4947cbe12856c051263e86c2daa2aedb431ab", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v2.0.2...51a4947cbe12856c051263e86c2daa2aedb431ab" | ||
} | ||
} |
@@ -93,6 +93,21 @@ const { EventEmitter } = require('events') | ||
async scheduleFetchManager() { | ||
scheduleFetchManager() { | ||
if (!this.running) { | ||
this.consuming = false | ||
this.logger.info('consumer not running, exiting', { | ||
groupId: this.consumerGroup.groupId, | ||
memberId: this.consumerGroup.memberId, | ||
}) | ||
return | ||
} | ||
this.consuming = true | ||
while (this.running) { | ||
this.retrier(async (bail, retryCount, retryTime) => { | ||
if (!this.running) { | ||
return | ||
} | ||
try { | ||
@@ -114,3 +129,3 @@ await this.fetchManager.start() | ||
await this.consumerGroup.joinAndSync() | ||
continue | ||
return | ||
} | ||
@@ -127,12 +142,33 @@ | ||
await this.consumerGroup.joinAndSync() | ||
continue | ||
return | ||
} | ||
if (e.name === 'KafkaJSNotImplemented') { | ||
return bail(e) | ||
} | ||
if (e.name === 'KafkaJSConnectionError') { | ||
return bail(e) | ||
} | ||
this.logger.debug('Error while scheduling fetch manager, trying again...', { | ||
groupId: this.consumerGroup.groupId, | ||
memberId: this.consumerGroup.memberId, | ||
error: e.message, | ||
stack: e.stack, | ||
retryCount, | ||
retryTime, | ||
}) | ||
throw e | ||
} | ||
}) | ||
.then(() => { | ||
this.scheduleFetchManager() | ||
}) | ||
.catch(e => { | ||
this.onCrash(e) | ||
break | ||
} | ||
} | ||
this.consuming = false | ||
this.running = false | ||
this.consuming = false | ||
this.running = false | ||
}) | ||
} | ||
@@ -410,35 +446,3 @@ | ||
return this.retrier(async (bail, retryCount, retryTime) => { | ||
try { | ||
await onBatch(batch) | ||
} catch (e) { | ||
if (!this.running) { | ||
this.logger.debug('consumer not running, exiting', { | ||
error: e.message, | ||
groupId: this.consumerGroup.groupId, | ||
memberId: this.consumerGroup.memberId, | ||
}) | ||
return | ||
} | ||
if ( | ||
isRebalancing(e) || | ||
e.type === 'UNKNOWN_MEMBER_ID' || | ||
e.name === 'KafkaJSNotImplemented' | ||
) { | ||
return bail(e) | ||
} | ||
this.logger.debug('Error while fetching data, trying again...', { | ||
groupId: this.consumerGroup.groupId, | ||
memberId: this.consumerGroup.memberId, | ||
error: e.message, | ||
stack: e.stack, | ||
retryCount, | ||
retryTime, | ||
}) | ||
throw e | ||
} | ||
}) | ||
await onBatch(batch) | ||
} | ||
@@ -445,0 +449,0 @@ |
712097
20760