Comparing version
@@ -7,2 +7,3 @@ const Kafka = require('./src') | ||
const ResourceTypes = require('./src/protocol/resourceTypes') | ||
const ConfigResourceTypes = require('./src/protocol/configResourceTypes') | ||
const { LEVELS } = require('./src/loggers') | ||
@@ -18,3 +19,10 @@ | ||
CompressionCodecs: Compression.Codecs, | ||
/** | ||
* @deprecated | ||
* @see https://github.com/tulios/kafkajs/issues/649 | ||
* | ||
* Use ConfigResourceTypes instead | ||
*/ | ||
ResourceTypes, | ||
ConfigResourceTypes, | ||
} |
{ | ||
"name": "kafkajs", | ||
"version": "1.15.0-beta.5", | ||
"version": "1.15.0-beta.6", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -83,5 +83,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"kafkajs": { | ||
"sha": "0cf924200380b81ab266e6b5c08ad951052c15a7", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.14.0...0cf924200380b81ab266e6b5c08ad951052c15a7" | ||
"sha": "f9d7dfec0f4d3735d6ba459795582116bd6567f6", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.14.0...f9d7dfec0f4d3735d6ba459795582116bd6567f6" | ||
} | ||
} |
const createRetry = require('../retry') | ||
const flatten = require('../utils/flatten') | ||
const waitFor = require('../utils/waitFor') | ||
const groupBy = require('../utils/groupBy') | ||
const createConsumer = require('../consumer') | ||
@@ -10,2 +11,3 @@ const InstrumentationEventEmitter = require('../instrumentation/emitter') | ||
const RESOURCE_TYPES = require('../protocol/resourceTypes') | ||
const CONFIG_RESOURCE_TYPES = require('../protocol/configResourceTypes') | ||
const { EARLIEST_OFFSET, LATEST_OFFSET } = require('../constants') | ||
@@ -505,3 +507,22 @@ | ||
const isBrokerConfig = type => | ||
[CONFIG_RESOURCE_TYPES.BROKER, CONFIG_RESOURCE_TYPES.BROKER_LOGGER].includes(type) | ||
/** | ||
* Broker configs can only be returned by the target broker | ||
* | ||
* @see | ||
* https://github.com/apache/kafka/blob/821c1ac6641845aeca96a43bc2b946ecec5cba4f/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L3783 | ||
* https://github.com/apache/kafka/blob/821c1ac6641845aeca96a43bc2b946ecec5cba4f/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L2027 | ||
* | ||
* @param {Broker} defaultBroker. Broker used in case the configuration is not a broker config | ||
*/ | ||
const groupResourcesByBroker = ({ resources, defaultBroker }) => | ||
groupBy(resources, async ({ type, name: nodeId }) => { | ||
return isBrokerConfig(type) | ||
? await cluster.findBroker({ nodeId: String(nodeId) }) | ||
: defaultBroker | ||
}) | ||
/** | ||
* @param {Array<ResourceConfigQuery>} resources | ||
@@ -512,3 +533,3 @@ * @param {boolean} [includeSynonyms=false] | ||
* @typedef {Object} ResourceConfigQuery | ||
* @property {ResourceType} type | ||
* @property {ConfigResourceType} type | ||
* @property {string} name | ||
@@ -526,3 +547,3 @@ * @property {Array<String>} [configNames=[]] | ||
const validResourceTypes = Object.values(RESOURCE_TYPES) | ||
const validResourceTypes = Object.values(CONFIG_RESOURCE_TYPES) | ||
const invalidType = resources.find(r => !validResourceTypes.includes(r.type)) | ||
@@ -560,5 +581,24 @@ | ||
await cluster.refreshMetadata() | ||
const broker = await cluster.findControllerBroker() | ||
const response = await broker.describeConfigs({ resources, includeSynonyms }) | ||
return response | ||
const controller = await cluster.findControllerBroker() | ||
const resourcerByBroker = await groupResourcesByBroker({ | ||
resources, | ||
defaultBroker: controller, | ||
}) | ||
const describeConfigsAction = async broker => { | ||
const targetBroker = broker || controller | ||
return targetBroker.describeConfigs({ | ||
resources: resourcerByBroker.get(targetBroker), | ||
includeSynonyms, | ||
}) | ||
} | ||
const brokers = Array.from(resourcerByBroker.keys()) | ||
const responses = await Promise.all(brokers.map(describeConfigsAction)) | ||
const responseResources = responses.reduce( | ||
(result, { resources }) => [...result, ...resources], | ||
[] | ||
) | ||
return { resources: responseResources } | ||
} catch (e) { | ||
@@ -581,3 +621,3 @@ if (e.type === 'NOT_CONTROLLER') { | ||
* @typedef {Object} ResourceConfig | ||
* @property {ResourceType} type | ||
* @property {ConfigResourceType} type | ||
* @property {string} name | ||
@@ -640,5 +680,24 @@ * @property {Array<ResourceConfigEntry>} configEntries | ||
await cluster.refreshMetadata() | ||
const broker = await cluster.findControllerBroker() | ||
const response = await broker.alterConfigs({ resources, validateOnly: !!validateOnly }) | ||
return response | ||
const controller = await cluster.findControllerBroker() | ||
const resourcerByBroker = await groupResourcesByBroker({ | ||
resources, | ||
defaultBroker: controller, | ||
}) | ||
const alterConfigsAction = async broker => { | ||
const targetBroker = broker || controller | ||
return targetBroker.alterConfigs({ | ||
resources: resourcerByBroker.get(targetBroker), | ||
validateOnly: !!validateOnly, | ||
}) | ||
} | ||
const brokers = Array.from(resourcerByBroker.keys()) | ||
const responses = await Promise.all(brokers.map(alterConfigsAction)) | ||
const responseResources = responses.reduce( | ||
(result, { resources }) => [...result, ...resources], | ||
[] | ||
) | ||
return { resources: responseResources } | ||
} catch (e) { | ||
@@ -645,0 +704,0 @@ if (e.type === 'NOT_CONTROLLER') { |
// From: | ||
// https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java#L31 | ||
/** | ||
* @deprecated | ||
* @see https://github.com/tulios/kafkajs/issues/649 | ||
* | ||
* Use ConfigResourceTypes instead | ||
*/ | ||
module.exports = { | ||
@@ -5,0 +12,0 @@ /** |
@@ -210,5 +210,11 @@ /// <reference types="node" /> | ||
} | ||
export enum ConfigResourceTypes { | ||
UNKNOWN = 0, | ||
TOPIC = 2, | ||
BROKER = 4, | ||
BROKER_LOGGER = 8, | ||
} | ||
export interface ResourceConfigQuery { | ||
type: ResourceTypes | ||
type: ResourceTypes | ConfigResourceTypes | ||
name: string | ||
@@ -239,3 +245,3 @@ configNames?: string[] | ||
resourceName: string | ||
resourceType: ResourceTypes | ||
resourceType: ResourceTypes | ConfigResourceTypes | ||
}[] | ||
@@ -246,3 +252,3 @@ throttleTime: number | ||
export interface IResourceConfig { | ||
type: ResourceTypes | ||
type: ResourceTypes | ConfigResourceTypes | ||
name: string | ||
@@ -249,0 +255,0 @@ configEntries: { name: string; value: string }[] |
601434
0.54%328
0.61%17724
0.5%