New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version 1.13.0-beta.12 to 1.13.0-beta.13

src/protocol/requests/deleteGroups/index.js

6

package.json
{
"name": "kafkajs",
"version": "1.13.0-beta.12",
"version": "1.13.0-beta.13",
"description": "A modern Apache Kafka client for node.js",

@@ -82,5 +82,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

"kafkajs": {
"sha": "9951162b69a710ed98a8967ca748eb0aa8f3af01",
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...9951162b69a710ed98a8967ca748eb0aa8f3af01"
"sha": "3601d7a7ba32137179df1e44e2a5fab4ffd40f6b",
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...3601d7a7ba32137179df1e44e2a5fab4ffd40f6b"
}
}
const createRetry = require('../retry')
const flatten = require('../utils/flatten')
const waitFor = require('../utils/waitFor')

@@ -7,3 +8,3 @@ const createConsumer = require('../consumer')

const { LEVELS } = require('../loggers')
const { KafkaJSNonRetriableError } = require('../errors')
const { KafkaJSNonRetriableError, KafkaJSDeleteGroupsError } = require('../errors')
const RESOURCE_TYPES = require('../protocol/resourceTypes')

@@ -690,6 +691,5 @@

const listGroups = async () => {
await cluster.refreshMetadata()
let groups = []
for (var nodeId in cluster.brokerPool.brokers) {
await cluster.refreshMetadata()
const broker = await cluster.findBroker({ nodeId })

@@ -704,2 +704,76 @@ const response = await broker.listGroups()

/**
* Delete groups in a broker
*
* @param {string[]} [groupIds]
* @return {Promise<DeleteGroups>}
*
* @typedef {Array} DeleteGroups
* @property {string} groupId
* @property {number} errorCode
*/
const deleteGroups = async groupIds => {
if (!groupIds || !Array.isArray(groupIds)) {
throw new KafkaJSNonRetriableError(`Invalid groupIds array ${groupIds}`)
}
const invalidGroupId = groupIds.some(g => typeof g !== 'string')
if (invalidGroupId) {
throw new KafkaJSNonRetriableError(`Invalid groupId name: ${JSON.stringify(invalidGroupId)}`)
}
const retrier = createRetry(retry)
let results = []
let clonedGroupIds = groupIds.slice()
return retrier(async (bail, retryCount, retryTime) => {
try {
if (clonedGroupIds.length === 0) return []
await cluster.refreshMetadata()
const brokersPerGroups = {}
const brokersPerNode = {}
for (const groupId of clonedGroupIds) {
const broker = await cluster.findGroupCoordinator({ groupId })
if (brokersPerGroups[broker.nodeId] === undefined) brokersPerGroups[broker.nodeId] = []
brokersPerGroups[broker.nodeId].push(groupId)
brokersPerNode[broker.nodeId] = broker
}
const res = await Promise.all(
Object.keys(brokersPerNode).map(
async nodeId => await brokersPerNode[nodeId].deleteGroups(brokersPerGroups[nodeId])
)
)
const errors = flatten(
res.map(({ results }) =>
results.map(({ groupId, errorCode, error }) => {
return { groupId, errorCode, error }
})
)
).filter(({ errorCode }) => errorCode !== 0)
clonedGroupIds = errors.map(({ groupId }) => groupId)
if (errors.length > 0) throw new KafkaJSDeleteGroupsError('Error in DeleteGroups', errors)
results = flatten(res.map(({ results }) => results))
return results
} catch (e) {
if (e.type === 'NOT_CONTROLLER' || e.type === 'COORDINATOR_NOT_AVAILABLE') {
logger.warn('Could not delete groups', { error: e.message, retryCount, retryTime })
throw e
}
bail(e)
}
})
}
/**
* @param {string} eventName

@@ -749,3 +823,4 @@ * @param {Function} listener

listGroups,
deleteGroups,
}
}

@@ -695,2 +695,13 @@ const Long = require('long')

/**
* Send request to delete groups
* @param {Array<string>} groupIds
* @public
* @returns {Promise}
*/
async deleteGroups(groupIds) {
const deleteGroups = this.lookupRequest(apiKeys.DeleteGroups, requests.DeleteGroups)
return await this.connection.send(deleteGroups(groupIds))
}
/***

@@ -697,0 +708,0 @@ * @private

@@ -91,2 +91,10 @@ class KafkaJSError extends Error {

class KafkaJSDeleteGroupsError extends KafkaJSError {
constructor(e, groups = []) {
super(e)
this.groups = groups
this.name = 'KafkaJSDeleteGroupsError'
}
}
class KafkaJSServerDoesNotSupportApiKey extends KafkaJSNonRetriableError {

@@ -173,2 +181,3 @@ constructor(e, { apiKey, apiName } = {}) {

KafkaJSStaleTopicMetadataAssignment,
KafkaJSDeleteGroupsError,
KafkaJSTimeout,

@@ -175,0 +184,0 @@ KafkaJSLockTimeout,

@@ -47,3 +47,3 @@ const apiKeys = require('./apiKeys')

DescribeDelegationToken: {},
DeleteGroups: {},
DeleteGroups: require('./deleteGroups'),
}

@@ -50,0 +50,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc