New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version 0.6.2 to 0.6.3

7

CHANGELOG.md

@@ -8,2 +8,9 @@ # Changelog

## [0.6.3] - 2017-12-11
### Added
- Add error logs for user errors in `eachMessage` and `eachBatch`
### Fixed
- Recover from rebalance in progress when starting the consumer
## [0.6.2] - 2017-12-11

@@ -10,0 +17,0 @@ ### Added

2

package.json
{
"name": "kafkajs",
"version": "0.6.2",
"version": "0.6.3",
"description": "A modern Apache Kafka client for node.js",

@@ -5,0 +5,0 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

const createRetry = require('../retry')
const { KafkaJSError } = require('../errors')
const isRebalancing = e =>
e.type === 'REBALANCE_IN_PROGRESS' || e.type === 'NOT_COORDINATOR_FOR_GROUP'
module.exports = class Runner {

@@ -26,4 +30,18 @@ constructor({

async join() {
await this.consumerGroup.join()
await this.consumerGroup.sync()
return this.retrier(async (bail, retryCount, retryTime) => {
try {
await this.consumerGroup.join()
await this.consumerGroup.sync()
} catch (e) {
if (isRebalancing(e)) {
// Rebalance in progress isn't a retriable error since the consumer
// has to go through find coordinator and join again before it can
// actually retry. Throwing a retriable error to allow the retrier
// to keep going
throw new KafkaJSError('The group is rebalancing')
}
bail(e)
}
})
}

@@ -89,2 +107,3 @@

} catch (e) {
this.logger.error(`Error when calling eachMessage`, { stack: e.stack })
// In case of errors, commit the previously consumed offsets

@@ -115,2 +134,3 @@ await this.consumerGroup.commitOffsets()

} catch (e) {
this.logger.error(`Error when calling eachBatch`, { stack: e.stack })
// eachBatch has a special resolveOffset which can be used

@@ -175,3 +195,3 @@ // to keep track of the messages

if (e.type === 'REBALANCE_IN_PROGRESS' || e.type === 'NOT_COORDINATOR_FOR_GROUP') {
if (isRebalancing(e)) {
this.logger.error('The group is rebalancing, re-joining', {

@@ -178,0 +198,0 @@ groupId: this.consumerGroup.groupId,

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc