New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version 1.15.0-beta.24 to 1.15.0-beta.25

6

package.json
{
"name": "kafkajs",
"version": "1.15.0-beta.24",
"version": "1.15.0-beta.25",
"description": "A modern Apache Kafka client for node.js",

@@ -83,5 +83,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

"kafkajs": {
"sha": "b83c14b332a61716fc3553ec293ee6c34dd46d22",
"compare": "https://github.com/tulios/kafkajs/compare/v1.14.0...b83c14b332a61716fc3553ec293ee6c34dd46d22"
"sha": "d13acd58f693bef5f2460ade036757d6314e4710",
"compare": "https://github.com/tulios/kafkajs/compare/v1.14.0...d13acd58f693bef5f2460ade036757d6314e4710"
}
}

@@ -5,3 +5,3 @@ const Broker = require('../broker')

const arrayDiff = require('../utils/arrayDiff')
const { KafkaJSBrokerNotFound } = require('../errors')
const { KafkaJSBrokerNotFound, KafkaJSProtocolError } = require('../errors')

@@ -315,9 +315,11 @@ const { keys, assign, values } = Object

await broker.disconnect()
}
// Connection refused means this node is down, or the cluster is restarting,
// which requires metadata refresh to discover the new nodes
if (e.code === 'ECONNREFUSED') {
return bail(e)
}
// To avoid reconnecting to an unavailable host, we bail on connection errors
// and refresh metadata on a higher level before reconnecting
if (e.name === 'KafkaJSConnectionError') {
return bail(e)
}
if (e.type === 'ILLEGAL_SASL_STATE') {
// Rebuild the connection since it can't recover from illegal SASL state

@@ -331,2 +333,3 @@ broker.connection = await this.connectionBuilder.build({

this.logger.error(`Failed to connect to broker, reconnecting`, { retryCount, retryTime })
throw new KafkaJSProtocolError(e, { retriable: true })
}

@@ -333,0 +336,0 @@

@@ -219,3 +219,3 @@ const BrokerPool = require('./brokerPool')

e.name === 'KafkaJSLockTimeout' ||
e.code === 'ECONNREFUSED'
e.name === 'KafkaJSConnectionError'
) {

@@ -222,0 +222,0 @@ await this.refreshMetadata()

@@ -22,4 +22,4 @@ const pkgJson = require('../package.json')

class KafkaJSProtocolError extends KafkaJSError {
constructor(e) {
super(e, { retriable: e.retriable })
constructor(e, { retriable = e.retriable } = {}) {
super(e, { retriable })
this.type = e.type

@@ -26,0 +26,0 @@ this.code = e.code

@@ -17,2 +17,3 @@ const createSendMessages = require('./sendMessages')

cluster,
retrier,
partitioner,

@@ -95,41 +96,7 @@ eosManager,

return retrier(async (bail, retryCount, retryTime) => {
try {
return await sendMessages({
acks,
timeout,
compression,
topicMessages: mergedTopicMessages,
})
} catch (error) {
if (error.name === 'KafkaJSConnectionClosedError') {
cluster.removeBroker({ host: error.host, port: error.port })
}
if (!cluster.isConnected()) {
logger.debug(`Cluster has disconnected, reconnecting: ${error.message}`, {
retryCount,
retryTime,
})
await cluster.connect()
await cluster.refreshMetadata()
throw error
}
// This is necessary in case the metadata is stale and the number of partitions
// for this topic has increased in the meantime
if (
error.name === 'KafkaJSConnectionError' ||
error.name === 'KafkaJSConnectionClosedError' ||
(error.name === 'KafkaJSProtocolError' && error.retriable)
) {
logger.error(`Failed to send messages: ${error.message}`, { retryCount, retryTime })
await cluster.refreshMetadata()
throw error
}
// Skip retries for errors not related to the Kafka protocol
logger.error(`${error.message}`, { retryCount, retryTime })
bail(error)
}
return await sendMessages({
acks,
timeout,
compression,
topicMessages: mergedTopicMessages,
})

@@ -136,0 +103,0 @@ }

@@ -1,2 +0,1 @@

const createRetry = require('../retry')
const flatten = require('../utils/flatten')

@@ -10,13 +9,7 @@ const { KafkaJSMetadataNotLoaded } = require('../errors')

const { keys } = Object
const TOTAL_INDIVIDUAL_ATTEMPTS = 5
module.exports = ({ logger, cluster, partitioner, eosManager }) => {
const retrier = createRetry({ retries: TOTAL_INDIVIDUAL_ATTEMPTS })
module.exports = ({ logger, cluster, partitioner, eosManager, retrier }) => {
return async ({ acks, timeout, compression, topicMessages }) => {
const responsePerBroker = new Map()
const topics = topicMessages.map(({ topic }) => topic)
await cluster.addMultipleTargetTopics(topics)
const createProducerRequests = async responsePerBroker => {

@@ -119,3 +112,6 @@ const topicMetadata = new Map()

const makeRequests = async (bail, retryCount, retryTime) => {
return retrier(async (bail, retryCount, retryTime) => {
const topics = topicMessages.map(({ topic }) => topic)
await cluster.addMultipleTargetTopics(topics)
try {

@@ -127,14 +123,36 @@ const requests = await createProducerRequests(responsePerBroker)

} catch (e) {
if (staleMetadata(e) || e.name === 'KafkaJSMetadataNotLoaded') {
if (e.name === 'KafkaJSConnectionClosedError') {
cluster.removeBroker({ host: e.host, port: e.port })
}
if (!cluster.isConnected()) {
logger.debug(`Cluster has disconnected, reconnecting: ${e.message}`, {
retryCount,
retryTime,
})
await cluster.connect()
await cluster.refreshMetadata()
throw e
}
throw e
// This is necessary in case the metadata is stale and the number of partitions
// for this topic has increased in the meantime
if (
staleMetadata(e) ||
e.name === 'KafkaJSMetadataNotLoaded' ||
e.name === 'KafkaJSConnectionError' ||
e.name === 'KafkaJSConnectionClosedError' ||
(e.name === 'KafkaJSProtocolError' && e.retriable)
) {
logger.error(`Failed to send messages: ${e.message}`, { retryCount, retryTime })
await cluster.refreshMetadata()
throw e
}
logger.error(`${e.message}`, { retryCount, retryTime })
if (e.retriable) throw e
bail(e)
}
}
return retrier(makeRequests).catch(e => {
throw e.originalError || e
})
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc