Comparing version
{ | ||
"name": "kafkajs", | ||
"version": "1.15.0-beta.15", | ||
"version": "1.15.0-beta.16", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -83,5 +83,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"kafkajs": { | ||
"sha": "0868189d7388a2868bbc241b1bc763f4f570d032", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.14.0...0868189d7388a2868bbc241b1bc763f4f570d032" | ||
"sha": "81d04bcddb64b1805ac56c5a0d86984e364b2646", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.14.0...81d04bcddb64b1805ac56c5a0d86984e364b2646" | ||
} | ||
} |
@@ -6,2 +6,3 @@ const flatten = require('../utils/flatten') | ||
const arrayDiff = require('../utils/arrayDiff') | ||
const createRetry = require('../retry') | ||
@@ -13,3 +14,3 @@ const OffsetManager = require('./offsetManager') | ||
const { | ||
events: { HEARTBEAT, CONNECT, RECEIVED_UNSUBSCRIBED_TOPICS }, | ||
events: { GROUP_JOIN, HEARTBEAT, CONNECT, RECEIVED_UNSUBSCRIBED_TOPICS }, | ||
} = require('./instrumentationEvents') | ||
@@ -35,4 +36,13 @@ const { MemberAssignment } = require('./assignerProtocol') | ||
const isRebalancing = e => | ||
e.type === 'REBALANCE_IN_PROGRESS' || e.type === 'NOT_COORDINATOR_FOR_GROUP' | ||
const PRIVATE = { | ||
JOIN: Symbol('private:ConsumerGroup:join'), | ||
SYNC: Symbol('private:ConsumerGroup:sync'), | ||
} | ||
module.exports = class ConsumerGroup { | ||
constructor({ | ||
retry, | ||
cluster, | ||
@@ -65,2 +75,3 @@ groupId, | ||
this.instrumentationEmitter = instrumentationEmitter | ||
this.retrier = createRetry(Object.assign({}, retry)) | ||
this.assigners = assigners | ||
@@ -113,3 +124,3 @@ this.sessionTimeout = sessionTimeout | ||
async join() { | ||
async [PRIVATE.JOIN]() { | ||
const { groupId, sessionTimeout, rebalanceTimeout } = this | ||
@@ -146,3 +157,3 @@ | ||
async sync() { | ||
async [PRIVATE.SYNC]() { | ||
let assignment = [] | ||
@@ -281,2 +292,40 @@ const { | ||
joinAndSync() { | ||
const startJoin = Date.now() | ||
return this.retrier(async bail => { | ||
try { | ||
await this[PRIVATE.JOIN]() | ||
await this[PRIVATE.SYNC]() | ||
const memberAssignment = this.assigned().reduce( | ||
(result, { topic, partitions }) => ({ ...result, [topic]: partitions }), | ||
{} | ||
) | ||
const payload = { | ||
groupId: this.groupId, | ||
memberId: this.memberId, | ||
leaderId: this.leaderId, | ||
isLeader: this.isLeader(), | ||
memberAssignment, | ||
groupProtocol: this.groupProtocol, | ||
duration: Date.now() - startJoin, | ||
} | ||
this.instrumentationEmitter.emit(GROUP_JOIN, payload) | ||
this.logger.info('Consumer has joined the group', payload) | ||
} catch (e) { | ||
if (isRebalancing(e)) { | ||
// Rebalance in progress isn't a retriable protocol error since the consumer | ||
// has to go through find coordinator and join again before it can | ||
// actually retry the operation. We wrap the original error in a retriable error | ||
// here instead in order to restart the join + sync sequence using the retrier. | ||
throw new KafkaJSError(e) | ||
} | ||
bail(e) | ||
} | ||
}) | ||
} | ||
resetOffset({ topic, partition }) { | ||
@@ -538,4 +587,3 @@ this.offsetManager.resetOffset({ topic, partition }) | ||
await this.cluster.refreshMetadata() | ||
await this.join() | ||
await this.sync() | ||
await this.joinAndSync() | ||
throw new KafkaJSError(e.message) | ||
@@ -552,4 +600,3 @@ } | ||
await this.join() | ||
await this.sync() | ||
await this.joinAndSync() | ||
} | ||
@@ -556,0 +603,0 @@ |
@@ -90,2 +90,3 @@ const Long = require('../utils/long') | ||
topicConfigurations: topics, | ||
retry, | ||
cluster, | ||
@@ -92,0 +93,0 @@ groupId, |
@@ -9,3 +9,3 @@ const EventEmitter = require('events') | ||
const { | ||
events: { GROUP_JOIN, FETCH, FETCH_START, START_BATCH_PROCESS, END_BATCH_PROCESS }, | ||
events: { FETCH, FETCH_START, START_BATCH_PROCESS, END_BATCH_PROCESS }, | ||
} = require('./instrumentationEvents') | ||
@@ -78,38 +78,4 @@ | ||
async join() { | ||
const startJoin = Date.now() | ||
return this.retrier(async (bail, retryCount, retryTime) => { | ||
try { | ||
await this.consumerGroup.join() | ||
await this.consumerGroup.sync() | ||
this.running = true | ||
const memberAssignment = this.consumerGroup | ||
.assigned() | ||
.reduce((result, { topic, partitions }) => ({ ...result, [topic]: partitions }), {}) | ||
const payload = { | ||
groupId: this.consumerGroup.groupId, | ||
memberId: this.consumerGroup.memberId, | ||
leaderId: this.consumerGroup.leaderId, | ||
isLeader: this.consumerGroup.isLeader(), | ||
memberAssignment, | ||
groupProtocol: this.consumerGroup.groupProtocol, | ||
duration: Date.now() - startJoin, | ||
} | ||
this.instrumentationEmitter.emit(GROUP_JOIN, payload) | ||
this.logger.info('Consumer has joined the group', payload) | ||
} catch (e) { | ||
if (isRebalancing(e)) { | ||
// Rebalance in progress isn't a retriable error since the consumer | ||
// has to go through find coordinator and join again before it can | ||
// actually retry. Throwing a retriable error to allow the retrier | ||
// to keep going | ||
throw new KafkaJSError('The group is rebalancing') | ||
} | ||
bail(e) | ||
} | ||
}) | ||
await this.consumerGroup.joinAndSync() | ||
this.running = true | ||
} | ||
@@ -536,3 +502,3 @@ | ||
bail(new KafkaJSError('The group is rebalancing')) | ||
bail(new KafkaJSError(e)) | ||
} | ||
@@ -552,3 +518,3 @@ | ||
bail(new KafkaJSError('The group is rebalancing')) | ||
bail(new KafkaJSError(e)) | ||
} | ||
@@ -555,0 +521,0 @@ |
670803
0.04%19885
0.06%