New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version

to
1.15.0-beta.16

6

package.json
{
"name": "kafkajs",
"version": "1.15.0-beta.15",
"version": "1.15.0-beta.16",
"description": "A modern Apache Kafka client for node.js",

@@ -83,5 +83,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

"kafkajs": {
"sha": "0868189d7388a2868bbc241b1bc763f4f570d032",
"compare": "https://github.com/tulios/kafkajs/compare/v1.14.0...0868189d7388a2868bbc241b1bc763f4f570d032"
"sha": "81d04bcddb64b1805ac56c5a0d86984e364b2646",
"compare": "https://github.com/tulios/kafkajs/compare/v1.14.0...81d04bcddb64b1805ac56c5a0d86984e364b2646"
}
}

@@ -6,2 +6,3 @@ const flatten = require('../utils/flatten')

const arrayDiff = require('../utils/arrayDiff')
const createRetry = require('../retry')

@@ -13,3 +14,3 @@ const OffsetManager = require('./offsetManager')

const {
events: { HEARTBEAT, CONNECT, RECEIVED_UNSUBSCRIBED_TOPICS },
events: { GROUP_JOIN, HEARTBEAT, CONNECT, RECEIVED_UNSUBSCRIBED_TOPICS },
} = require('./instrumentationEvents')

@@ -35,4 +36,13 @@ const { MemberAssignment } = require('./assignerProtocol')

const isRebalancing = e =>
e.type === 'REBALANCE_IN_PROGRESS' || e.type === 'NOT_COORDINATOR_FOR_GROUP'
const PRIVATE = {
JOIN: Symbol('private:ConsumerGroup:join'),
SYNC: Symbol('private:ConsumerGroup:sync'),
}
module.exports = class ConsumerGroup {
constructor({
retry,
cluster,

@@ -65,2 +75,3 @@ groupId,

this.instrumentationEmitter = instrumentationEmitter
this.retrier = createRetry(Object.assign({}, retry))
this.assigners = assigners

@@ -113,3 +124,3 @@ this.sessionTimeout = sessionTimeout

async join() {
async [PRIVATE.JOIN]() {
const { groupId, sessionTimeout, rebalanceTimeout } = this

@@ -146,3 +157,3 @@

async sync() {
async [PRIVATE.SYNC]() {
let assignment = []

@@ -281,2 +292,40 @@ const {

joinAndSync() {
const startJoin = Date.now()
return this.retrier(async bail => {
try {
await this[PRIVATE.JOIN]()
await this[PRIVATE.SYNC]()
const memberAssignment = this.assigned().reduce(
(result, { topic, partitions }) => ({ ...result, [topic]: partitions }),
{}
)
const payload = {
groupId: this.groupId,
memberId: this.memberId,
leaderId: this.leaderId,
isLeader: this.isLeader(),
memberAssignment,
groupProtocol: this.groupProtocol,
duration: Date.now() - startJoin,
}
this.instrumentationEmitter.emit(GROUP_JOIN, payload)
this.logger.info('Consumer has joined the group', payload)
} catch (e) {
if (isRebalancing(e)) {
// Rebalance in progress isn't a retriable protocol error since the consumer
// has to go through find coordinator and join again before it can
// actually retry the operation. We wrap the original error in a retriable error
// here instead in order to restart the join + sync sequence using the retrier.
throw new KafkaJSError(e)
}
bail(e)
}
})
}
resetOffset({ topic, partition }) {

@@ -538,4 +587,3 @@ this.offsetManager.resetOffset({ topic, partition })

await this.cluster.refreshMetadata()
await this.join()
await this.sync()
await this.joinAndSync()
throw new KafkaJSError(e.message)

@@ -552,4 +600,3 @@ }

await this.join()
await this.sync()
await this.joinAndSync()
}

@@ -556,0 +603,0 @@

@@ -90,2 +90,3 @@ const Long = require('../utils/long')

topicConfigurations: topics,
retry,
cluster,

@@ -92,0 +93,0 @@ groupId,

@@ -9,3 +9,3 @@ const EventEmitter = require('events')

const {
events: { GROUP_JOIN, FETCH, FETCH_START, START_BATCH_PROCESS, END_BATCH_PROCESS },
events: { FETCH, FETCH_START, START_BATCH_PROCESS, END_BATCH_PROCESS },
} = require('./instrumentationEvents')

@@ -78,38 +78,4 @@

async join() {
const startJoin = Date.now()
return this.retrier(async (bail, retryCount, retryTime) => {
try {
await this.consumerGroup.join()
await this.consumerGroup.sync()
this.running = true
const memberAssignment = this.consumerGroup
.assigned()
.reduce((result, { topic, partitions }) => ({ ...result, [topic]: partitions }), {})
const payload = {
groupId: this.consumerGroup.groupId,
memberId: this.consumerGroup.memberId,
leaderId: this.consumerGroup.leaderId,
isLeader: this.consumerGroup.isLeader(),
memberAssignment,
groupProtocol: this.consumerGroup.groupProtocol,
duration: Date.now() - startJoin,
}
this.instrumentationEmitter.emit(GROUP_JOIN, payload)
this.logger.info('Consumer has joined the group', payload)
} catch (e) {
if (isRebalancing(e)) {
// Rebalance in progress isn't a retriable error since the consumer
// has to go through find coordinator and join again before it can
// actually retry. Throwing a retriable error to allow the retrier
// to keep going
throw new KafkaJSError('The group is rebalancing')
}
bail(e)
}
})
await this.consumerGroup.joinAndSync()
this.running = true
}

@@ -536,3 +502,3 @@

bail(new KafkaJSError('The group is rebalancing'))
bail(new KafkaJSError(e))
}

@@ -552,3 +518,3 @@

bail(new KafkaJSError('The group is rebalancing'))
bail(new KafkaJSError(e))
}

@@ -555,0 +521,0 @@