New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version

to
1.13.0-beta.7

src/protocol/requests/createPartitions/index.js

6

package.json
{
"name": "kafkajs",
"version": "1.13.0-beta.6",
"version": "1.13.0-beta.7",
"description": "A modern Apache Kafka client for node.js",

@@ -82,5 +82,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

"kafkajs": {
"sha": "7c454fd713ff85c8cc87e90709f6893ce36a284b",
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...7c454fd713ff85c8cc87e90709f6893ce36a284b"
"sha": "ef231493dfa0d3d969b6e2085ac68e703154feee",
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...ef231493dfa0d3d969b6e2085ac68e703154feee"
}
}

@@ -127,3 +127,47 @@ const createRetry = require('../retry')

}
/**
* @param {array} topicPartitions
* @param {boolean} [validateOnly=false]
* @param {number} [timeout=5000]
* @return {Promise<void>}
*/
const createPartitions = async ({ topicPartitions, validateOnly, timeout }) => {
if (!topicPartitions || !Array.isArray(topicPartitions)) {
throw new KafkaJSNonRetriableError(`Invalid topic partitions array ${topicPartitions}`)
}
if (topicPartitions.length === 0) {
throw new KafkaJSNonRetriableError(`Empty topic partitions array`)
}
if (topicPartitions.filter(({ topic }) => typeof topic !== 'string').length > 0) {
throw new KafkaJSNonRetriableError(
'Invalid topic partitions array, the topic names have to be a valid string'
)
}
const topicNames = new Set(topicPartitions.map(({ topic }) => topic))
if (topicNames.size < topicPartitions.length) {
throw new KafkaJSNonRetriableError(
'Invalid topic partitions array, it cannot have multiple entries for the same topic'
)
}
const retrier = createRetry(retry)
return retrier(async (bail, retryCount, retryTime) => {
try {
await cluster.refreshMetadata()
const broker = await cluster.findControllerBroker()
await broker.createPartitions({ topicPartitions, validateOnly, timeout })
} catch (e) {
if (e.type === 'NOT_CONTROLLER') {
logger.warn('Could not create topics', { error: e.message, retryCount, retryTime })
throw e
}
bail(e)
}
})
}
/**

@@ -689,2 +733,3 @@ * @param {string[]} topics

deleteTopics,
createPartitions,
getTopicMetadata,

@@ -691,0 +736,0 @@ fetchTopicMetadata,

@@ -509,2 +509,23 @@ const Long = require('long')

* @public
* @param {Array} topicPartitions e.g:
* [
* {
* topic: 'topic-name',
* count: 3,
* assignments: []
* }
* ]
* @param {boolean} [validateOnly=false] If this is true, the request will be validated, but the topic
* won't be created
* @param {number} [timeout=5000] The time in ms to wait for a topic to be completely created
* on the controller node
* @returns {Promise<void>}
*/
async createPartitions({ topicPartitions, validateOnly = false, timeout = 5000 }) {
const createPartitions = this.lookupRequest(apiKeys.CreatePartitions, requests.CreatePartitions)
return await this.connection.send(createPartitions({ topicPartitions, validateOnly, timeout }))
}
/**
* @public
* @param {Array<string>} topics An array of topics to be deleted

@@ -511,0 +532,0 @@ * @param {number} [timeout=5000] The time in ms to wait for a topic to be completely deleted on the

@@ -42,3 +42,3 @@ const apiKeys = require('./apiKeys')

SaslAuthenticate: require('./saslAuthenticate'),
CreatePartitions: {},
CreatePartitions: require('./createPartitions'),
CreateDelegationToken: {},

@@ -45,0 +45,0 @@ RenewDelegationToken: {},

@@ -182,2 +182,8 @@ /// <reference types="node" />

export interface ITopicPartitionConfig {
topic: string
count: number
assignments?: Array<Array<number>>
}
export interface ITopicMetadata {

@@ -302,2 +308,7 @@ name: string

deleteTopics(options: { topics: string[]; timeout?: number }): Promise<void>
createPartitions(options: {
validateOnly?: boolean
timeout?: number
topicPartitions: ITopicPartitionConfig[]
}): Promise<boolean>
fetchTopicMetadata(options: { topics: string[] }): Promise<{ topics: Array<ITopicMetadata> }>

@@ -304,0 +315,0 @@ fetchOffsets(options: {