Comparing version
{ | ||
"name": "kafkajs", | ||
"version": "1.13.0-beta.23", | ||
"version": "1.13.0-beta.24", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -82,5 +82,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"kafkajs": { | ||
"sha": "ee93cf42864f22483c5918181bf035e65dd7818b", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...ee93cf42864f22483c5918181bf035e65dd7818b" | ||
"sha": "b13567d66da541b7049a9f4e0e981d15e28158bc", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...b13567d66da541b7049a9f4e0e981d15e28158bc" | ||
} | ||
} |
@@ -0,1 +1,2 @@ | ||
const EventEmitter = require('events') | ||
const Long = require('long') | ||
@@ -18,4 +19,6 @@ const createRetry = require('../retry') | ||
const isSameOffset = (offsetA, offsetB) => Long.fromValue(offsetA).equals(Long.fromValue(offsetB)) | ||
const CONSUMING_START = 'consuming-start' | ||
const CONSUMING_STOP = 'consuming-stop' | ||
module.exports = class Runner { | ||
module.exports = class Runner extends EventEmitter { | ||
constructor({ | ||
@@ -34,2 +37,3 @@ logger, | ||
}) { | ||
super() | ||
this.logger = logger.namespace('Runner') | ||
@@ -51,2 +55,13 @@ this.consumerGroup = consumerGroup | ||
get consuming() { | ||
return this._consuming | ||
} | ||
set consuming(value) { | ||
if (this._consuming !== value) { | ||
this._consuming = value | ||
this.emit(value ? CONSUMING_START : CONSUMING_STOP) | ||
} | ||
} | ||
async join() { | ||
@@ -141,16 +156,10 @@ const startJoin = Date.now() | ||
return new Promise(resolve => { | ||
const scheduleWait = () => { | ||
this.logger.debug('waiting for consumer to finish...', { | ||
groupId: this.consumerGroup.groupId, | ||
memberId: this.consumerGroup.memberId, | ||
}) | ||
setTimeout(() => (!this.consuming ? resolve() : scheduleWait()), 1000) | ||
} | ||
if (!this.consuming) { | ||
return resolve() | ||
} | ||
scheduleWait() | ||
this.logger.debug('waiting for consumer to finish...', { | ||
groupId: this.consumerGroup.groupId, | ||
memberId: this.consumerGroup.memberId, | ||
}) | ||
this.once(CONSUMING_STOP, () => resolve()) | ||
}) | ||
@@ -157,0 +166,0 @@ } |
513641
0.05%15205
0.07%