Comparing version 1.13.0-beta.12 to 1.13.0-beta.13
{ | ||
"name": "kafkajs", | ||
"version": "1.13.0-beta.12", | ||
"version": "1.13.0-beta.13", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -82,5 +82,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"kafkajs": { | ||
"sha": "9951162b69a710ed98a8967ca748eb0aa8f3af01", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...9951162b69a710ed98a8967ca748eb0aa8f3af01" | ||
"sha": "3601d7a7ba32137179df1e44e2a5fab4ffd40f6b", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...3601d7a7ba32137179df1e44e2a5fab4ffd40f6b" | ||
} | ||
} |
const createRetry = require('../retry') | ||
const flatten = require('../utils/flatten') | ||
const waitFor = require('../utils/waitFor') | ||
@@ -7,3 +8,3 @@ const createConsumer = require('../consumer') | ||
const { LEVELS } = require('../loggers') | ||
const { KafkaJSNonRetriableError } = require('../errors') | ||
const { KafkaJSNonRetriableError, KafkaJSDeleteGroupsError } = require('../errors') | ||
const RESOURCE_TYPES = require('../protocol/resourceTypes') | ||
@@ -690,6 +691,5 @@ | ||
const listGroups = async () => { | ||
await cluster.refreshMetadata() | ||
let groups = [] | ||
for (var nodeId in cluster.brokerPool.brokers) { | ||
await cluster.refreshMetadata() | ||
const broker = await cluster.findBroker({ nodeId }) | ||
@@ -704,2 +704,76 @@ const response = await broker.listGroups() | ||
/** | ||
* Delete groups in a broker | ||
* | ||
* @param {string[]} [groupIds] | ||
* @return {Promise<DeleteGroups>} | ||
* | ||
* @typedef {Array} DeleteGroups | ||
* @property {string} groupId | ||
* @property {number} errorCode | ||
*/ | ||
const deleteGroups = async groupIds => { | ||
if (!groupIds || !Array.isArray(groupIds)) { | ||
throw new KafkaJSNonRetriableError(`Invalid groupIds array ${groupIds}`) | ||
} | ||
const invalidGroupId = groupIds.some(g => typeof g !== 'string') | ||
if (invalidGroupId) { | ||
throw new KafkaJSNonRetriableError(`Invalid groupId name: ${JSON.stringify(invalidGroupId)}`) | ||
} | ||
const retrier = createRetry(retry) | ||
let results = [] | ||
let clonedGroupIds = groupIds.slice() | ||
return retrier(async (bail, retryCount, retryTime) => { | ||
try { | ||
if (clonedGroupIds.length === 0) return [] | ||
await cluster.refreshMetadata() | ||
const brokersPerGroups = {} | ||
const brokersPerNode = {} | ||
for (const groupId of clonedGroupIds) { | ||
const broker = await cluster.findGroupCoordinator({ groupId }) | ||
if (brokersPerGroups[broker.nodeId] === undefined) brokersPerGroups[broker.nodeId] = [] | ||
brokersPerGroups[broker.nodeId].push(groupId) | ||
brokersPerNode[broker.nodeId] = broker | ||
} | ||
const res = await Promise.all( | ||
Object.keys(brokersPerNode).map( | ||
async nodeId => await brokersPerNode[nodeId].deleteGroups(brokersPerGroups[nodeId]) | ||
) | ||
) | ||
const errors = flatten( | ||
res.map(({ results }) => | ||
results.map(({ groupId, errorCode, error }) => { | ||
return { groupId, errorCode, error } | ||
}) | ||
) | ||
).filter(({ errorCode }) => errorCode !== 0) | ||
clonedGroupIds = errors.map(({ groupId }) => groupId) | ||
if (errors.length > 0) throw new KafkaJSDeleteGroupsError('Error in DeleteGroups', errors) | ||
results = flatten(res.map(({ results }) => results)) | ||
return results | ||
} catch (e) { | ||
if (e.type === 'NOT_CONTROLLER' || e.type === 'COORDINATOR_NOT_AVAILABLE') { | ||
logger.warn('Could not delete groups', { error: e.message, retryCount, retryTime }) | ||
throw e | ||
} | ||
bail(e) | ||
} | ||
}) | ||
} | ||
/** | ||
* @param {string} eventName | ||
@@ -749,3 +823,4 @@ * @param {Function} listener | ||
listGroups, | ||
deleteGroups, | ||
} | ||
} |
@@ -695,2 +695,13 @@ const Long = require('long') | ||
/** | ||
* Send request to delete groups | ||
* @param {Array<string>} groupIds | ||
* @public | ||
* @returns {Promise} | ||
*/ | ||
async deleteGroups(groupIds) { | ||
const deleteGroups = this.lookupRequest(apiKeys.DeleteGroups, requests.DeleteGroups) | ||
return await this.connection.send(deleteGroups(groupIds)) | ||
} | ||
/*** | ||
@@ -697,0 +708,0 @@ * @private |
@@ -91,2 +91,10 @@ class KafkaJSError extends Error { | ||
class KafkaJSDeleteGroupsError extends KafkaJSError { | ||
constructor(e, groups = []) { | ||
super(e) | ||
this.groups = groups | ||
this.name = 'KafkaJSDeleteGroupsError' | ||
} | ||
} | ||
class KafkaJSServerDoesNotSupportApiKey extends KafkaJSNonRetriableError { | ||
@@ -173,2 +181,3 @@ constructor(e, { apiKey, apiName } = {}) { | ||
KafkaJSStaleTopicMetadataAssignment, | ||
KafkaJSDeleteGroupsError, | ||
KafkaJSTimeout, | ||
@@ -175,0 +184,0 @@ KafkaJSLockTimeout, |
@@ -47,3 +47,3 @@ const apiKeys = require('./apiKeys') | ||
DescribeDelegationToken: {}, | ||
DeleteGroups: {}, | ||
DeleteGroups: require('./deleteGroups'), | ||
} | ||
@@ -50,0 +50,0 @@ |
512859
289
15179