New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version

to
1.15.0-beta.6

src/protocol/configResourceTypes.js

8

index.js

@@ -7,2 +7,3 @@ const Kafka = require('./src')

const ResourceTypes = require('./src/protocol/resourceTypes')
const ConfigResourceTypes = require('./src/protocol/configResourceTypes')
const { LEVELS } = require('./src/loggers')

@@ -18,3 +19,10 @@

CompressionCodecs: Compression.Codecs,
/**
* @deprecated
* @see https://github.com/tulios/kafkajs/issues/649
*
* Use ConfigResourceTypes instead
*/
ResourceTypes,
ConfigResourceTypes,
}

6

package.json
{
"name": "kafkajs",
"version": "1.15.0-beta.5",
"version": "1.15.0-beta.6",
"description": "A modern Apache Kafka client for node.js",

@@ -83,5 +83,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

"kafkajs": {
"sha": "0cf924200380b81ab266e6b5c08ad951052c15a7",
"compare": "https://github.com/tulios/kafkajs/compare/v1.14.0...0cf924200380b81ab266e6b5c08ad951052c15a7"
"sha": "f9d7dfec0f4d3735d6ba459795582116bd6567f6",
"compare": "https://github.com/tulios/kafkajs/compare/v1.14.0...f9d7dfec0f4d3735d6ba459795582116bd6567f6"
}
}
const createRetry = require('../retry')
const flatten = require('../utils/flatten')
const waitFor = require('../utils/waitFor')
const groupBy = require('../utils/groupBy')
const createConsumer = require('../consumer')

@@ -10,2 +11,3 @@ const InstrumentationEventEmitter = require('../instrumentation/emitter')

const RESOURCE_TYPES = require('../protocol/resourceTypes')
const CONFIG_RESOURCE_TYPES = require('../protocol/configResourceTypes')
const { EARLIEST_OFFSET, LATEST_OFFSET } = require('../constants')

@@ -505,3 +507,22 @@

const isBrokerConfig = type =>
[CONFIG_RESOURCE_TYPES.BROKER, CONFIG_RESOURCE_TYPES.BROKER_LOGGER].includes(type)
/**
* Broker configs can only be returned by the target broker
*
* @see
* https://github.com/apache/kafka/blob/821c1ac6641845aeca96a43bc2b946ecec5cba4f/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L3783
* https://github.com/apache/kafka/blob/821c1ac6641845aeca96a43bc2b946ecec5cba4f/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L2027
*
* @param {Broker} defaultBroker. Broker used in case the configuration is not a broker config
*/
const groupResourcesByBroker = ({ resources, defaultBroker }) =>
groupBy(resources, async ({ type, name: nodeId }) => {
return isBrokerConfig(type)
? await cluster.findBroker({ nodeId: String(nodeId) })
: defaultBroker
})
/**
* @param {Array<ResourceConfigQuery>} resources

@@ -512,3 +533,3 @@ * @param {boolean} [includeSynonyms=false]

* @typedef {Object} ResourceConfigQuery
* @property {ResourceType} type
* @property {ConfigResourceType} type
* @property {string} name

@@ -526,3 +547,3 @@ * @property {Array<String>} [configNames=[]]

const validResourceTypes = Object.values(RESOURCE_TYPES)
const validResourceTypes = Object.values(CONFIG_RESOURCE_TYPES)
const invalidType = resources.find(r => !validResourceTypes.includes(r.type))

@@ -560,5 +581,24 @@

await cluster.refreshMetadata()
const broker = await cluster.findControllerBroker()
const response = await broker.describeConfigs({ resources, includeSynonyms })
return response
const controller = await cluster.findControllerBroker()
const resourcerByBroker = await groupResourcesByBroker({
resources,
defaultBroker: controller,
})
const describeConfigsAction = async broker => {
const targetBroker = broker || controller
return targetBroker.describeConfigs({
resources: resourcerByBroker.get(targetBroker),
includeSynonyms,
})
}
const brokers = Array.from(resourcerByBroker.keys())
const responses = await Promise.all(brokers.map(describeConfigsAction))
const responseResources = responses.reduce(
(result, { resources }) => [...result, ...resources],
[]
)
return { resources: responseResources }
} catch (e) {

@@ -581,3 +621,3 @@ if (e.type === 'NOT_CONTROLLER') {

* @typedef {Object} ResourceConfig
* @property {ResourceType} type
* @property {ConfigResourceType} type
* @property {string} name

@@ -640,5 +680,24 @@ * @property {Array<ResourceConfigEntry>} configEntries

await cluster.refreshMetadata()
const broker = await cluster.findControllerBroker()
const response = await broker.alterConfigs({ resources, validateOnly: !!validateOnly })
return response
const controller = await cluster.findControllerBroker()
const resourcerByBroker = await groupResourcesByBroker({
resources,
defaultBroker: controller,
})
const alterConfigsAction = async broker => {
const targetBroker = broker || controller
return targetBroker.alterConfigs({
resources: resourcerByBroker.get(targetBroker),
validateOnly: !!validateOnly,
})
}
const brokers = Array.from(resourcerByBroker.keys())
const responses = await Promise.all(brokers.map(alterConfigsAction))
const responseResources = responses.reduce(
(result, { resources }) => [...result, ...resources],
[]
)
return { resources: responseResources }
} catch (e) {

@@ -645,0 +704,0 @@ if (e.type === 'NOT_CONTROLLER') {

// From:
// https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java#L31
/**
* @deprecated
* @see https://github.com/tulios/kafkajs/issues/649
*
* Use ConfigResourceTypes instead
*/
module.exports = {

@@ -5,0 +12,0 @@ /**

@@ -210,5 +210,11 @@ /// <reference types="node" />

}
export enum ConfigResourceTypes {
UNKNOWN = 0,
TOPIC = 2,
BROKER = 4,
BROKER_LOGGER = 8,
}
export interface ResourceConfigQuery {
type: ResourceTypes
type: ResourceTypes | ConfigResourceTypes
name: string

@@ -239,3 +245,3 @@ configNames?: string[]

resourceName: string
resourceType: ResourceTypes
resourceType: ResourceTypes | ConfigResourceTypes
}[]

@@ -246,3 +252,3 @@ throttleTime: number

export interface IResourceConfig {
type: ResourceTypes
type: ResourceTypes | ConfigResourceTypes
name: string

@@ -249,0 +255,0 @@ configEntries: { name: string; value: string }[]