New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version

to
1.15.0-beta.14

src/protocol/requests/deleteRecords/index.js

6

package.json
{
"name": "kafkajs",
"version": "1.15.0-beta.13",
"version": "1.15.0-beta.14",
"description": "A modern Apache Kafka client for node.js",

@@ -83,5 +83,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

"kafkajs": {
"sha": "5b90d16106eb0372d23c6d31ee139958797ec5a2",
"compare": "https://github.com/tulios/kafkajs/compare/v1.14.0...5b90d16106eb0372d23c6d31ee139958797ec5a2"
"sha": "f7a166488321216a0feec4428f8e589b116eb31f",
"compare": "https://github.com/tulios/kafkajs/compare/v1.14.0...f7a166488321216a0feec4428f8e589b116eb31f"
}
}

@@ -38,3 +38,3 @@ [![npm version](https://img.shields.io/npm/v/kafkajs?color=%2344cc11&label=stable)](https://www.npmjs.com/package/kafkajs) [![npm pre-release version](https://img.shields.io/npm/v/kafkajs/beta?label=pre-release)](https://www.npmjs.com/package/kafkajs) [![Build Status](https://dev.azure.com/tulios/kafkajs/_apis/build/status/tulios.kafkajs?branchName=master)](https://dev.azure.com/tulios/kafkajs/_build/latest?definitionId=2&branchName=master) [![Slack Channel](https://kafkajs-slackin.herokuapp.com/badge.svg)](https://kafkajs-slackin.herokuapp.com/)

- [Contact](#contact)
- [Sponsors](#sponsors)
- [Sponsors](#sponsorship)
- [License](#license)

@@ -138,5 +138,5 @@ - [Acknowledgements](#acknowledgements)

## <a name="sponsors"></a> Sponsors ❤️
## <a name="sponsorship"></a> Sponsors ❤️
*To become a sponsor, [reach out in our Slack community](https://kafkajs-slackin.herokuapp.com/) to get in touch with one of the maintainers. Also consider becoming a Github Sponsor by following any of the links under "Sponsor this project" in the sidebar.*
*To become a sponsor, [reach out in our Slack community](https://kafkajs-slackin.herokuapp.com/) to get in touch with one of the maintainers. Also consider becoming a Github Sponsor by following any of the links under "[Sponsor this project](https://github.com/tulios/kafkajs#sponsors)" in the sidebar.*

@@ -143,0 +143,0 @@ <a href="https://www.confluent.io/confluent-cloud/?utm_source=kafkajs&utm_medium=opensource&utm_campaign=referral">

@@ -9,3 +9,9 @@ const createRetry = require('../retry')

const { LEVELS } = require('../loggers')
const { KafkaJSNonRetriableError, KafkaJSDeleteGroupsError } = require('../errors')
const {
KafkaJSNonRetriableError,
KafkaJSDeleteGroupsError,
KafkaJSBrokerNotFound,
KafkaJSDeleteTopicRecordsError,
} = require('../errors')
const { staleMetadata } = require('../protocol/error')
const CONFIG_RESOURCE_TYPES = require('../protocol/configResourceTypes')

@@ -22,3 +28,3 @@ const ACL_RESOURCE_TYPES = require('../protocol/aclResourceTypes')

const { values, keys } = Object
const { values, keys, entries } = Object
const eventNames = values(events)

@@ -996,2 +1002,128 @@ const eventKeys = keys(events)

/**
* Delete topic records up to the selected partition offsets
*
* @param {string} topic
* @param {Array<SeekEntry>} partitions
* @return {Promise}
*
* @typedef {Object} SeekEntry
* @property {number} partition
* @property {string} offset
*/
const deleteTopicRecords = async ({ topic, partitions }) => {
if (!topic || typeof topic !== 'string') {
throw new KafkaJSNonRetriableError(`Invalid topic "${topic}"`)
}
if (!partitions || partitions.length === 0) {
throw new KafkaJSNonRetriableError(`Invalid partitions`)
}
const partitionsByBroker = cluster.findLeaderForPartitions(
topic,
partitions.map(p => p.partition)
)
const partitionsFound = flatten(values(partitionsByBroker))
const topicOffsets = await fetchTopicOffsets(topic)
const leaderNotFoundErrors = []
partitions.forEach(({ partition, offset }) => {
// throw if no leader found for partition
if (!partitionsFound.includes(partition)) {
leaderNotFoundErrors.push({
partition,
offset,
error: new KafkaJSBrokerNotFound('Could not find the leader for the partition', {
retriable: false,
}),
})
return
}
const { low } = topicOffsets.find(p => p.partition === partition) || {
high: undefined,
low: undefined,
}
// warn in case of offset below low watermark
if (parseInt(offset) < parseInt(low)) {
logger.warn(
'The requested offset is before the earliest offset maintained on the partition - no records will be deleted from this partition',
{
topic,
partition,
offset,
}
)
}
})
if (leaderNotFoundErrors.length > 0) {
throw new KafkaJSDeleteTopicRecordsError({ topic, partitions: leaderNotFoundErrors })
}
const seekEntriesByBroker = entries(partitionsByBroker).reduce(
(obj, [nodeId, nodePartitions]) => {
obj[nodeId] = {
topic,
partitions: partitions.filter(p => nodePartitions.includes(p.partition)),
}
return obj
},
{}
)
const retrier = createRetry(retry)
return retrier(async bail => {
try {
const partitionErrors = []
const brokerRequests = entries(seekEntriesByBroker).map(
([nodeId, { topic, partitions }]) => async () => {
const broker = await cluster.findBroker({ nodeId })
await broker.deleteRecords({ topics: [{ topic, partitions }] })
// remove successful entry so it's ignored on retry
delete seekEntriesByBroker[nodeId]
}
)
await Promise.all(
brokerRequests.map(request =>
request().catch(e => {
if (e.name === 'KafkaJSDeleteTopicRecordsError') {
e.partitions.forEach(({ partition, offset, error }) => {
partitionErrors.push({
partition,
offset,
error,
})
})
} else {
// then it's an unknown error, not from the broker response
throw e
}
})
)
)
if (partitionErrors.length > 0) {
throw new KafkaJSDeleteTopicRecordsError({
topic,
partitions: partitionErrors,
})
}
} catch (e) {
if (
e.retriable &&
e.partitions.some(
({ error }) => staleMetadata(error) || error.name === 'KafkaJSMetadataNotLoaded'
)
) {
await cluster.refreshMetadata()
}
throw e
}
})
}
/**
* @param {Array<ACLEntry>} acl

@@ -1347,3 +1479,4 @@ * @return {Promise<void>}

createAcls,
deleteTopicRecords,
}
}

@@ -762,3 +762,35 @@ const Long = require('../utils/long')

/**
* Send request to delete records
* @public
* @param {Array<Object>} topics
* [
* {
* topic: 'my-topic-name',
* partitions: [
* { partition: 0, offset 2 },
* { partition: 1, offset 4 },
* ],
* }
* ]
* @returns {Promise<Array>} example:
* {
* throttleTime: 0
* [
* {
* topic: 'my-topic-name',
* partitions: [
* { partition: 0, lowWatermark: '2n', errorCode: 0 },
* { partition: 1, lowWatermark: '4n', errorCode: 0 },
* ],
* },
* ]
* }
*/
async deleteRecords({ topics }) {
const deleteRecords = this.lookupRequest(apiKeys.DeleteRecords, requests.DeleteRecords)
return await this[PRIVATE.SEND_REQUEST](deleteRecords({ topics }))
}
/**
* @public
* @param {Array} ACL e.g:

@@ -765,0 +797,0 @@ * [

@@ -183,2 +183,17 @@ const pkgJson = require('../package.json')

class KafkaJSDeleteTopicRecordsError extends KafkaJSError {
constructor({ partitions }) {
/*
* This error is retriable if all the errors were retriable
*/
const retriable = partitions
.filter(({ error }) => error != null)
.every(({ error }) => error.retriable === true)
super('Error while deleting records', { retriable })
this.name = 'KafkaJSDeleteTopicRecordsError'
this.partitions = partitions
}
}
const issueUrl = pkgJson.bugs.url

@@ -220,3 +235,4 @@

KafkaJSUnsupportedMagicByteInMessageSet,
KafkaJSDeleteTopicRecordsError,
KafkaJSInvariantViolation,
}
const createRetry = require('../retry')
const flatten = require('../utils/flatten')
const { KafkaJSMetadataNotLoaded } = require('../errors')
const { staleMetadata } = require('../protocol/error')
const groupMessagesPerPartition = require('./groupMessagesPerPartition')

@@ -10,6 +11,2 @@ const createTopicData = require('./createTopicData')

const TOTAL_INDIVIDUAL_ATTEMPTS = 5
const staleMetadata = e =>
['UNKNOWN_TOPIC_OR_PARTITION', 'LEADER_NOT_AVAILABLE', 'NOT_LEADER_FOR_PARTITION'].includes(
e.type
)

@@ -16,0 +13,0 @@ module.exports = ({ logger, cluster, partitioner, eosManager }) => {

@@ -590,2 +590,7 @@ const { KafkaJSProtocolError } = require('../errors')

const staleMetadata = e =>
['UNKNOWN_TOPIC_OR_PARTITION', 'LEADER_NOT_AVAILABLE', 'NOT_LEADER_FOR_PARTITION'].includes(
e.type
)
module.exports = {

@@ -596,2 +601,3 @@ failure,

failIfVersionNotSupported,
staleMetadata,
}

@@ -26,3 +26,3 @@ const apiKeys = require('./apiKeys')

DeleteTopics: require('./deleteTopics'),
DeleteRecords: {},
DeleteRecords: require('./deleteRecords'),
InitProducerId: require('./initProducerId'),

@@ -29,0 +29,0 @@ OffsetForLeaderEpoch: {},

@@ -458,2 +458,3 @@ /// <reference types="node" />

createAcls(options: { acls: AclEntry[] }): Promise<boolean>
deleteTopicRecords(options: { topic: string; partitions: SeekEntry[] }): Promise<void>
logger(): Logger

@@ -968,2 +969,6 @@ on(

export class KafkaJSDeleteTopicRecordsError extends KafkaJSError {
constructor(metadata: KafkaJSDeleteTopicRecordsErrorTopic)
}
export interface KafkaJSDeleteGroupsErrorGroups {

@@ -975,2 +980,14 @@ groupId: string

export interface KafkaJSDeleteTopicRecordsErrorTopic {
topic: string,
partitions: KafkaJSDeleteTopicRecordsErrorPartition[]
}
export interface KafkaJSDeleteTopicRecordsErrorPartition {
partition: number;
offset: string;
error: KafkaJSError
}
export interface KafkaJSErrorMetadata {

@@ -977,0 +994,0 @@ retriable?: boolean