Comparing version
{ | ||
"name": "kafkajs", | ||
"version": "1.15.0-beta.13", | ||
"version": "1.15.0-beta.14", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -83,5 +83,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"kafkajs": { | ||
"sha": "5b90d16106eb0372d23c6d31ee139958797ec5a2", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.14.0...5b90d16106eb0372d23c6d31ee139958797ec5a2" | ||
"sha": "f7a166488321216a0feec4428f8e589b116eb31f", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.14.0...f7a166488321216a0feec4428f8e589b116eb31f" | ||
} | ||
} |
@@ -38,3 +38,3 @@ [](https://www.npmjs.com/package/kafkajs) [](https://www.npmjs.com/package/kafkajs) [](https://dev.azure.com/tulios/kafkajs/_build/latest?definitionId=2&branchName=master) [](https://kafkajs-slackin.herokuapp.com/) | ||
- [Contact](#contact) | ||
- [Sponsors](#sponsors) | ||
- [Sponsors](#sponsorship) | ||
- [License](#license) | ||
@@ -138,5 +138,5 @@ - [Acknowledgements](#acknowledgements) | ||
## <a name="sponsors"></a> Sponsors ❤️ | ||
## <a name="sponsorship"></a> Sponsors ❤️ | ||
*To become a sponsor, [reach out in our Slack community](https://kafkajs-slackin.herokuapp.com/) to get in touch with one of the maintainers. Also consider becoming a Github Sponsor by following any of the links under "Sponsor this project" in the sidebar.* | ||
*To become a sponsor, [reach out in our Slack community](https://kafkajs-slackin.herokuapp.com/) to get in touch with one of the maintainers. Also consider becoming a Github Sponsor by following any of the links under "[Sponsor this project](https://github.com/tulios/kafkajs#sponsors)" in the sidebar.* | ||
@@ -143,0 +143,0 @@ <a href="https://www.confluent.io/confluent-cloud/?utm_source=kafkajs&utm_medium=opensource&utm_campaign=referral"> |
@@ -9,3 +9,9 @@ const createRetry = require('../retry') | ||
const { LEVELS } = require('../loggers') | ||
const { KafkaJSNonRetriableError, KafkaJSDeleteGroupsError } = require('../errors') | ||
const { | ||
KafkaJSNonRetriableError, | ||
KafkaJSDeleteGroupsError, | ||
KafkaJSBrokerNotFound, | ||
KafkaJSDeleteTopicRecordsError, | ||
} = require('../errors') | ||
const { staleMetadata } = require('../protocol/error') | ||
const CONFIG_RESOURCE_TYPES = require('../protocol/configResourceTypes') | ||
@@ -22,3 +28,3 @@ const ACL_RESOURCE_TYPES = require('../protocol/aclResourceTypes') | ||
const { values, keys } = Object | ||
const { values, keys, entries } = Object | ||
const eventNames = values(events) | ||
@@ -996,2 +1002,128 @@ const eventKeys = keys(events) | ||
/** | ||
* Delete topic records up to the selected partition offsets | ||
* | ||
* @param {string} topic | ||
* @param {Array<SeekEntry>} partitions | ||
* @return {Promise} | ||
* | ||
* @typedef {Object} SeekEntry | ||
* @property {number} partition | ||
* @property {string} offset | ||
*/ | ||
const deleteTopicRecords = async ({ topic, partitions }) => { | ||
if (!topic || typeof topic !== 'string') { | ||
throw new KafkaJSNonRetriableError(`Invalid topic "${topic}"`) | ||
} | ||
if (!partitions || partitions.length === 0) { | ||
throw new KafkaJSNonRetriableError(`Invalid partitions`) | ||
} | ||
const partitionsByBroker = cluster.findLeaderForPartitions( | ||
topic, | ||
partitions.map(p => p.partition) | ||
) | ||
const partitionsFound = flatten(values(partitionsByBroker)) | ||
const topicOffsets = await fetchTopicOffsets(topic) | ||
const leaderNotFoundErrors = [] | ||
partitions.forEach(({ partition, offset }) => { | ||
// throw if no leader found for partition | ||
if (!partitionsFound.includes(partition)) { | ||
leaderNotFoundErrors.push({ | ||
partition, | ||
offset, | ||
error: new KafkaJSBrokerNotFound('Could not find the leader for the partition', { | ||
retriable: false, | ||
}), | ||
}) | ||
return | ||
} | ||
const { low } = topicOffsets.find(p => p.partition === partition) || { | ||
high: undefined, | ||
low: undefined, | ||
} | ||
// warn in case of offset below low watermark | ||
if (parseInt(offset) < parseInt(low)) { | ||
logger.warn( | ||
'The requested offset is before the earliest offset maintained on the partition - no records will be deleted from this partition', | ||
{ | ||
topic, | ||
partition, | ||
offset, | ||
} | ||
) | ||
} | ||
}) | ||
if (leaderNotFoundErrors.length > 0) { | ||
throw new KafkaJSDeleteTopicRecordsError({ topic, partitions: leaderNotFoundErrors }) | ||
} | ||
const seekEntriesByBroker = entries(partitionsByBroker).reduce( | ||
(obj, [nodeId, nodePartitions]) => { | ||
obj[nodeId] = { | ||
topic, | ||
partitions: partitions.filter(p => nodePartitions.includes(p.partition)), | ||
} | ||
return obj | ||
}, | ||
{} | ||
) | ||
const retrier = createRetry(retry) | ||
return retrier(async bail => { | ||
try { | ||
const partitionErrors = [] | ||
const brokerRequests = entries(seekEntriesByBroker).map( | ||
([nodeId, { topic, partitions }]) => async () => { | ||
const broker = await cluster.findBroker({ nodeId }) | ||
await broker.deleteRecords({ topics: [{ topic, partitions }] }) | ||
// remove successful entry so it's ignored on retry | ||
delete seekEntriesByBroker[nodeId] | ||
} | ||
) | ||
await Promise.all( | ||
brokerRequests.map(request => | ||
request().catch(e => { | ||
if (e.name === 'KafkaJSDeleteTopicRecordsError') { | ||
e.partitions.forEach(({ partition, offset, error }) => { | ||
partitionErrors.push({ | ||
partition, | ||
offset, | ||
error, | ||
}) | ||
}) | ||
} else { | ||
// then it's an unknown error, not from the broker response | ||
throw e | ||
} | ||
}) | ||
) | ||
) | ||
if (partitionErrors.length > 0) { | ||
throw new KafkaJSDeleteTopicRecordsError({ | ||
topic, | ||
partitions: partitionErrors, | ||
}) | ||
} | ||
} catch (e) { | ||
if ( | ||
e.retriable && | ||
e.partitions.some( | ||
({ error }) => staleMetadata(error) || error.name === 'KafkaJSMetadataNotLoaded' | ||
) | ||
) { | ||
await cluster.refreshMetadata() | ||
} | ||
throw e | ||
} | ||
}) | ||
} | ||
/** | ||
* @param {Array<ACLEntry>} acl | ||
@@ -1347,3 +1479,4 @@ * @return {Promise<void>} | ||
createAcls, | ||
deleteTopicRecords, | ||
} | ||
} |
@@ -762,3 +762,35 @@ const Long = require('../utils/long') | ||
/** | ||
* Send request to delete records | ||
* @public | ||
* @param {Array<Object>} topics | ||
* [ | ||
* { | ||
* topic: 'my-topic-name', | ||
* partitions: [ | ||
* { partition: 0, offset 2 }, | ||
* { partition: 1, offset 4 }, | ||
* ], | ||
* } | ||
* ] | ||
* @returns {Promise<Array>} example: | ||
* { | ||
* throttleTime: 0 | ||
* [ | ||
* { | ||
* topic: 'my-topic-name', | ||
* partitions: [ | ||
* { partition: 0, lowWatermark: '2n', errorCode: 0 }, | ||
* { partition: 1, lowWatermark: '4n', errorCode: 0 }, | ||
* ], | ||
* }, | ||
* ] | ||
* } | ||
*/ | ||
async deleteRecords({ topics }) { | ||
const deleteRecords = this.lookupRequest(apiKeys.DeleteRecords, requests.DeleteRecords) | ||
return await this[PRIVATE.SEND_REQUEST](deleteRecords({ topics })) | ||
} | ||
/** | ||
* @public | ||
* @param {Array} ACL e.g: | ||
@@ -765,0 +797,0 @@ * [ |
@@ -183,2 +183,17 @@ const pkgJson = require('../package.json') | ||
class KafkaJSDeleteTopicRecordsError extends KafkaJSError { | ||
constructor({ partitions }) { | ||
/* | ||
* This error is retriable if all the errors were retriable | ||
*/ | ||
const retriable = partitions | ||
.filter(({ error }) => error != null) | ||
.every(({ error }) => error.retriable === true) | ||
super('Error while deleting records', { retriable }) | ||
this.name = 'KafkaJSDeleteTopicRecordsError' | ||
this.partitions = partitions | ||
} | ||
} | ||
const issueUrl = pkgJson.bugs.url | ||
@@ -220,3 +235,4 @@ | ||
KafkaJSUnsupportedMagicByteInMessageSet, | ||
KafkaJSDeleteTopicRecordsError, | ||
KafkaJSInvariantViolation, | ||
} |
const createRetry = require('../retry') | ||
const flatten = require('../utils/flatten') | ||
const { KafkaJSMetadataNotLoaded } = require('../errors') | ||
const { staleMetadata } = require('../protocol/error') | ||
const groupMessagesPerPartition = require('./groupMessagesPerPartition') | ||
@@ -10,6 +11,2 @@ const createTopicData = require('./createTopicData') | ||
const TOTAL_INDIVIDUAL_ATTEMPTS = 5 | ||
const staleMetadata = e => | ||
['UNKNOWN_TOPIC_OR_PARTITION', 'LEADER_NOT_AVAILABLE', 'NOT_LEADER_FOR_PARTITION'].includes( | ||
e.type | ||
) | ||
@@ -16,0 +13,0 @@ module.exports = ({ logger, cluster, partitioner, eosManager }) => { |
@@ -590,2 +590,7 @@ const { KafkaJSProtocolError } = require('../errors') | ||
const staleMetadata = e => | ||
['UNKNOWN_TOPIC_OR_PARTITION', 'LEADER_NOT_AVAILABLE', 'NOT_LEADER_FOR_PARTITION'].includes( | ||
e.type | ||
) | ||
module.exports = { | ||
@@ -596,2 +601,3 @@ failure, | ||
failIfVersionNotSupported, | ||
staleMetadata, | ||
} |
@@ -26,3 +26,3 @@ const apiKeys = require('./apiKeys') | ||
DeleteTopics: require('./deleteTopics'), | ||
DeleteRecords: {}, | ||
DeleteRecords: require('./deleteRecords'), | ||
InitProducerId: require('./initProducerId'), | ||
@@ -29,0 +29,0 @@ OffsetForLeaderEpoch: {}, |
@@ -458,2 +458,3 @@ /// <reference types="node" /> | ||
createAcls(options: { acls: AclEntry[] }): Promise<boolean> | ||
deleteTopicRecords(options: { topic: string; partitions: SeekEntry[] }): Promise<void> | ||
logger(): Logger | ||
@@ -968,2 +969,6 @@ on( | ||
export class KafkaJSDeleteTopicRecordsError extends KafkaJSError { | ||
constructor(metadata: KafkaJSDeleteTopicRecordsErrorTopic) | ||
} | ||
export interface KafkaJSDeleteGroupsErrorGroups { | ||
@@ -975,2 +980,14 @@ groupId: string | ||
export interface KafkaJSDeleteTopicRecordsErrorTopic { | ||
topic: string, | ||
partitions: KafkaJSDeleteTopicRecordsErrorPartition[] | ||
} | ||
export interface KafkaJSDeleteTopicRecordsErrorPartition { | ||
partition: number; | ||
offset: string; | ||
error: KafkaJSError | ||
} | ||
export interface KafkaJSErrorMetadata { | ||
@@ -977,0 +994,0 @@ retriable?: boolean |
648326
1.74%352
1.44%19214
1.72%