New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version 1.8.0 to 1.8.1

4

CHANGELOG.md

@@ -8,2 +8,6 @@ # Changelog

## [1.8.1] - 2019-06-25
### Fixed
- Make sure runner has connected brokers and fresh metadata before it starts #404
## [1.8.0] - 2019-05-13

@@ -10,0 +14,0 @@ ### Added

2

package.json
{
"name": "kafkajs",
"version": "1.8.0",
"version": "1.8.1",
"description": "A modern Apache Kafka client for node.js",

@@ -5,0 +5,0 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

@@ -184,3 +184,7 @@ const BrokerPool = require('./brokerPool')

// The client probably has stale metadata
if (e.name === 'KafkaJSLockTimeout' || e.code === 'ECONNREFUSED') {
if (
e.name === 'KafkaJSBrokerNotFound' ||
e.name === 'KafkaJSLockTimeout' ||
e.code === 'ECONNREFUSED'
) {
await this.refreshMetadata()

@@ -187,0 +191,0 @@ }

@@ -84,2 +84,7 @@ const flatten = require('../utils/flatten')

async connect() {
await this.cluster.connect()
await this.cluster.refreshMetadataIfNecessary()
}
async join() {

@@ -86,0 +91,0 @@ const { groupId, sessionTimeout, rebalanceTimeout } = this

@@ -253,16 +253,26 @@ const Long = require('long')

const coordinator = await this.getCoordinator()
await coordinator.offsetCommit(payload)
this.instrumentationEmitter.emit(COMMIT_OFFSETS, payload)
try {
const coordinator = await this.getCoordinator()
await coordinator.offsetCommit(payload)
this.instrumentationEmitter.emit(COMMIT_OFFSETS, payload)
// Update local reference of committed offsets
topics.forEach(({ topic, partitions }) => {
const updatedOffsets = partitions.reduce(
(obj, { partition, offset }) => assign(obj, { [partition]: offset }),
{}
)
assign(this.committedOffsets()[topic], updatedOffsets)
})
// Update local reference of committed offsets
topics.forEach(({ topic, partitions }) => {
const updatedOffsets = partitions.reduce(
(obj, { partition, offset }) => assign(obj, { [partition]: offset }),
{}
)
assign(this.committedOffsets()[topic], updatedOffsets)
})
this.lastCommit = Date.now()
this.lastCommit = Date.now()
} catch (e) {
// metadata is stale, the coordinator has changed due to a restart or
// broker reassignment
if (e.type === 'NOT_COORDINATOR_FOR_GROUP') {
await this.cluster.refreshMetadata()
}
throw e
}
}

@@ -269,0 +279,0 @@

@@ -86,2 +86,3 @@ const createRetry = require('../retry')

try {
await this.consumerGroup.connect()
await this.join()

@@ -88,0 +89,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc