Comparing version
{ | ||
"name": "kafkajs", | ||
"version": "1.13.0-beta.6", | ||
"version": "1.13.0-beta.7", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -82,5 +82,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"kafkajs": { | ||
"sha": "7c454fd713ff85c8cc87e90709f6893ce36a284b", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...7c454fd713ff85c8cc87e90709f6893ce36a284b" | ||
"sha": "ef231493dfa0d3d969b6e2085ac68e703154feee", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...ef231493dfa0d3d969b6e2085ac68e703154feee" | ||
} | ||
} |
@@ -127,3 +127,47 @@ const createRetry = require('../retry') | ||
} | ||
/** | ||
* @param {array} topicPartitions | ||
* @param {boolean} [validateOnly=false] | ||
* @param {number} [timeout=5000] | ||
* @return {Promise<void>} | ||
*/ | ||
const createPartitions = async ({ topicPartitions, validateOnly, timeout }) => { | ||
if (!topicPartitions || !Array.isArray(topicPartitions)) { | ||
throw new KafkaJSNonRetriableError(`Invalid topic partitions array ${topicPartitions}`) | ||
} | ||
if (topicPartitions.length === 0) { | ||
throw new KafkaJSNonRetriableError(`Empty topic partitions array`) | ||
} | ||
if (topicPartitions.filter(({ topic }) => typeof topic !== 'string').length > 0) { | ||
throw new KafkaJSNonRetriableError( | ||
'Invalid topic partitions array, the topic names have to be a valid string' | ||
) | ||
} | ||
const topicNames = new Set(topicPartitions.map(({ topic }) => topic)) | ||
if (topicNames.size < topicPartitions.length) { | ||
throw new KafkaJSNonRetriableError( | ||
'Invalid topic partitions array, it cannot have multiple entries for the same topic' | ||
) | ||
} | ||
const retrier = createRetry(retry) | ||
return retrier(async (bail, retryCount, retryTime) => { | ||
try { | ||
await cluster.refreshMetadata() | ||
const broker = await cluster.findControllerBroker() | ||
await broker.createPartitions({ topicPartitions, validateOnly, timeout }) | ||
} catch (e) { | ||
if (e.type === 'NOT_CONTROLLER') { | ||
logger.warn('Could not create topics', { error: e.message, retryCount, retryTime }) | ||
throw e | ||
} | ||
bail(e) | ||
} | ||
}) | ||
} | ||
/** | ||
@@ -689,2 +733,3 @@ * @param {string[]} topics | ||
deleteTopics, | ||
createPartitions, | ||
getTopicMetadata, | ||
@@ -691,0 +736,0 @@ fetchTopicMetadata, |
@@ -509,2 +509,23 @@ const Long = require('long') | ||
* @public | ||
* @param {Array} topicPartitions e.g: | ||
* [ | ||
* { | ||
* topic: 'topic-name', | ||
* count: 3, | ||
* assignments: [] | ||
* } | ||
* ] | ||
* @param {boolean} [validateOnly=false] If this is true, the request will be validated, but the topic | ||
* won't be created | ||
* @param {number} [timeout=5000] The time in ms to wait for a topic to be completely created | ||
* on the controller node | ||
* @returns {Promise<void>} | ||
*/ | ||
async createPartitions({ topicPartitions, validateOnly = false, timeout = 5000 }) { | ||
const createPartitions = this.lookupRequest(apiKeys.CreatePartitions, requests.CreatePartitions) | ||
return await this.connection.send(createPartitions({ topicPartitions, validateOnly, timeout })) | ||
} | ||
/** | ||
* @public | ||
* @param {Array<string>} topics An array of topics to be deleted | ||
@@ -511,0 +532,0 @@ * @param {number} [timeout=5000] The time in ms to wait for a topic to be completely deleted on the |
@@ -42,3 +42,3 @@ const apiKeys = require('./apiKeys') | ||
SaslAuthenticate: require('./saslAuthenticate'), | ||
CreatePartitions: {}, | ||
CreatePartitions: require('./createPartitions'), | ||
CreateDelegationToken: {}, | ||
@@ -45,0 +45,0 @@ RenewDelegationToken: {}, |
@@ -182,2 +182,8 @@ /// <reference types="node" /> | ||
export interface ITopicPartitionConfig { | ||
topic: string | ||
count: number | ||
assignments?: Array<Array<number>> | ||
} | ||
export interface ITopicMetadata { | ||
@@ -302,2 +308,7 @@ name: string | ||
deleteTopics(options: { topics: string[]; timeout?: number }): Promise<void> | ||
createPartitions(options: { | ||
validateOnly?: boolean | ||
timeout?: number | ||
topicPartitions: ITopicPartitionConfig[] | ||
}): Promise<boolean> | ||
fetchTopicMetadata(options: { topics: string[] }): Promise<{ topics: Array<ITopicMetadata> }> | ||
@@ -304,0 +315,0 @@ fetchOffsets(options: { |
505187
1.22%282
1.81%14932
1.13%