New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version

to
1.14.0-beta.9

6

package.json
{
"name": "kafkajs",
"version": "1.14.0-beta.8",
"version": "1.14.0-beta.9",
"description": "A modern Apache Kafka client for node.js",

@@ -83,5 +83,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

"kafkajs": {
"sha": "7614eeeec75af3be3deb5f986e92f96ba0aeee82",
"compare": "https://github.com/tulios/kafkajs/compare/v1.13.0...7614eeeec75af3be3deb5f986e92f96ba0aeee82"
"sha": "6c553c855e42688f196aaf4b9054750ab7d3f2b9",
"compare": "https://github.com/tulios/kafkajs/compare/v1.13.0...6c553c855e42688f196aaf4b9054750ab7d3f2b9"
}
}

@@ -45,7 +45,2 @@ const Broker = require('../broker')

this.seedBroker = this.createBroker({
connection: this.connectionBuilder.build(),
logger: this.rootLogger,
})
this.brokers = {}

@@ -64,5 +59,19 @@ this.metadata = null

const brokers = values(this.brokers)
return !!brokers.find(broker => broker.isConnected()) || this.seedBroker.isConnected()
return (
!!brokers.find(broker => broker.isConnected()) ||
(this.seedBroker ? this.seedBroker.isConnected() : false)
)
}
async createSeedBroker() {
if (this.seedBroker) {
await this.seedBroker.disconnect()
}
this.seedBroker = this.createBroker({
connection: await this.connectionBuilder.build(),
logger: this.rootLogger,
})
}
/**

@@ -77,2 +86,6 @@ * @public

if (!this.seedBroker) {
await this.createSeedBroker()
}
return this.retrier(async (bail, retryCount, retryTime) => {

@@ -85,6 +98,3 @@ try {

// Connection builder will always rotate the seed broker
this.seedBroker = this.createBroker({
connection: this.connectionBuilder.build(),
logger: this.rootLogger,
})
await this.createSeedBroker()
this.logger.error(

@@ -109,3 +119,3 @@ `Failed to connect to seed broker, trying another broker from the list: ${e.message}`,

async disconnect() {
await this.seedBroker.disconnect()
this.seedBroker && (await this.seedBroker.disconnect())
await Promise.all(values(this.brokers).map(broker => broker.disconnect()))

@@ -154,30 +164,36 @@

const replacedBrokers = []
this.brokers = this.metadata.brokers.reduce((result, { nodeId, host, port, rack }) => {
if (result[nodeId]) {
if (!hasBrokerBeenReplaced(result[nodeId], { host, port, rack })) {
return result
this.brokers = await this.metadata.brokers.reduce(
async (resultPromise, { nodeId, host, port, rack }) => {
const result = await resultPromise
if (result[nodeId]) {
if (!hasBrokerBeenReplaced(result[nodeId], { host, port, rack })) {
return result
}
replacedBrokers.push(result[nodeId])
}
replacedBrokers.push(result[nodeId])
}
if (host === seedHost && port === seedPort) {
this.seedBroker.nodeId = nodeId
this.seedBroker.connection.rack = rack
return assign(result, {
[nodeId]: this.seedBroker,
})
}
if (host === seedHost && port === seedPort) {
this.seedBroker.nodeId = nodeId
this.seedBroker.connection.rack = rack
return assign(result, {
[nodeId]: this.seedBroker,
[nodeId]: this.createBroker({
logger: this.rootLogger,
versions: this.versions,
supportAuthenticationProtocol: this.supportAuthenticationProtocol,
connection: await this.connectionBuilder.build({ host, port, rack }),
nodeId,
}),
})
}
},
this.brokers
)
return assign(result, {
[nodeId]: this.createBroker({
logger: this.rootLogger,
versions: this.versions,
supportAuthenticationProtocol: this.supportAuthenticationProtocol,
connection: this.connectionBuilder.build({ host, port, rack }),
nodeId,
}),
})
}, this.brokers)
const freshBrokerIds = this.metadata.brokers.map(({ nodeId }) => `${nodeId}`).sort()

@@ -312,3 +328,3 @@ const currentBrokerIds = keys(this.brokers).sort()

// Rebuild the connection since it can't recover from illegal SASL state
broker.connection = this.connectionBuilder.build({
broker.connection = await this.connectionBuilder.build({
host: broker.connection.host,

@@ -315,0 +331,0 @@ port: broker.connection.port,

const Connection = require('../network/connection')
const { KafkaJSNonRetriableError } = require('../errors')
const shuffle = require('../utils/shuffle')
const { KafkaJSConnectionError, KafkaJSNonRetriableError } = require('../errors')
const validateBrokers = brokers => {
if (!brokers || brokers.length === 0) {
throw new KafkaJSNonRetriableError(`Failed to connect: expected brokers array and got nothing`)
}
}
module.exports = ({

@@ -25,15 +18,41 @@ socketFactory,

}) => {
validateBrokers(brokers)
const shuffledBrokers = shuffle(brokers)
const size = brokers.length
let index = 0
const getBrokers = async () => {
if (!brokers) {
throw new KafkaJSNonRetriableError(`Failed to connect: brokers parameter should not be null`)
}
// static list
if (Array.isArray(brokers)) {
if (!brokers.length) {
throw new KafkaJSNonRetriableError(`Failed to connect: brokers array is empty`)
}
return brokers
}
// dynamic brokers
let list
try {
list = await brokers()
} catch (e) {
logger.error(e)
throw new KafkaJSConnectionError(`Failed to connect: brokers function returned exception`)
}
if (!list || list.length === 0) {
throw new KafkaJSConnectionError(`Failed to connect: brokers function returned nothing`)
}
return list
}
return {
build: ({ host, port, rack } = {}) => {
build: async ({ host, port, rack } = {}) => {
if (!host) {
// Always rotate the seed broker
const [seedHost, seedPort] = shuffledBrokers[index++ % size].split(':')
host = seedHost
port = Number(seedPort)
const list = await getBrokers()
const randomBroker = list[index++ % list.length]
host = randomBroker.split(':')[0]
port = Number(randomBroker.split(':')[1])
}

@@ -45,4 +64,4 @@

rack,
sasl,
ssl,
sasl,
clientId,

@@ -49,0 +68,0 @@ socketFactory,

@@ -17,4 +17,6 @@ /// <reference types="node" />

export type BrokersFunction = () => string[] | Promise<string[]>;
export interface KafkaConfig {
brokers: string[]
brokers: string[] | BrokersFunction
ssl?: tls.ConnectionOptions | boolean

@@ -21,0 +23,0 @@ sasl?: SASLOptions