New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version 1.6.0 to 1.7.0

11

CHANGELOG.md

@@ -8,2 +8,13 @@ # Changelog

## [1.7.0] - 2019-04-12
### Fixed
- Improve compatibility with terserjs #338
### Added
- Add `admin#fetchTopicMetadata` #331
### Changed
- Deprecated `admin#getTopicMetadata` #331
- `admin#fetchTopicOffsets` returns the low and high watermarks #333
## [1.6.0] - 2019-04-01

@@ -10,0 +21,0 @@ ### Added

3

package.json
{
"name": "kafkajs",
"version": "1.6.0",
"version": "1.7.0",
"description": "A modern Apache Kafka client for node.js",

@@ -62,2 +62,3 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

"prettier": "^1.15.2",
"semver": "^6.0.0",
"uuid": "^3.3.2"

@@ -64,0 +65,0 @@ },

@@ -155,3 +155,3 @@ const createRetry = require('../retry')

} catch (e) {
if (e.type === 'NOT_CONTROLLER') {
if (['NOT_CONTROLLER', 'UNKNOWN_TOPIC_OR_PARTITION'].includes(e.type)) {
logger.warn('Could not delete topics', { error: e.message, retryCount, retryTime })

@@ -194,5 +194,6 @@ throw e

const metadata = cluster.findTopicPartitionMetadata(topic)
const offsets = await cluster.fetchTopicsOffset([
const high = await cluster.fetchTopicsOffset([
{
topic,
fromBeginning: false,
partitions: metadata.map(p => ({ partition: p.partitionId })),

@@ -202,4 +203,19 @@ },

const { partitions } = offsets.pop()
return partitions.map(({ partition, offset }) => ({ partition, offset }))
const low = await cluster.fetchTopicsOffset([
{
topic,
fromBeginning: true,
partitions: metadata.map(p => ({ partition: p.partitionId })),
},
])
const { partitions: highPartitions } = high.pop()
const { partitions: lowPartitions } = low.pop()
return highPartitions.map(({ partition, offset }) => ({
partition,
offset,
high: offset,
low: lowPartitions.find(({ partition: lowPartition }) => lowPartition === partition)
.offset,
}))
} catch (e) {

@@ -475,2 +491,6 @@ if (e.type === 'UNKNOWN_TOPIC_OR_PARTITION') {

/**
* @deprecated - This method was replaced by `fetchTopicMetadata`. This implementation
* is limited by the topics in the target group, so it can't fetch all topics when
* necessary.
*
* Fetch metadata for provided topics.

@@ -533,2 +553,47 @@ *

/**
* Fetch metadata for provided topics.
*
* If no topics are provided fetch metadata for all topics.
* @see https://kafka.apache.org/protocol#The_Messages_Metadata
*
* @param {Object} [options]
* @param {string[]} [options.topics]
* @return {Promise<TopicsMetadata>}
*
* @typedef {Object} TopicsMetadata
* @property {Array<TopicMetadata>} topics
*
* @typedef {Object} TopicMetadata
* @property {String} name
* @property {Array<PartitionMetadata>} partitions
*
* @typedef {Object} PartitionMetadata
* @property {number} partitionErrorCode Response error code
* @property {number} partitionId Topic partition id
* @property {number} leader The id of the broker acting as leader for this partition.
* @property {Array<number>} replicas The set of all nodes that host this partition.
* @property {Array<number>} isr The set of nodes that are in sync with the leader for this partition.
*/
const fetchTopicMetadata = async ({ topics = [] } = {}) => {
if (topics) {
await Promise.all(
topics.map(async topic => {
if (!topic) {
throw new KafkaJSNonRetriableError(`Invalid topic ${topic}`)
}
})
)
}
const metadata = await cluster.metadata({ topics })
return {
topics: metadata.topicMetadata.map(topicMetadata => ({
name: topicMetadata.topic,
partitions: topicMetadata.partitionMetadata,
})),
}
}
/**
* @param {string} eventName

@@ -565,2 +630,3 @@ * @param {Function} listener

getTopicMetadata,
fetchTopicMetadata,
events,

@@ -567,0 +633,0 @@ fetchOffsets,

@@ -130,2 +130,21 @@ const BrokerPool = require('./brokerPool')

* @public
* @returns {Promise<Metadata>}
*/
async metadata({ topics = [] } = {}) {
return this.retrier(async (bail, retryCount, retryTime) => {
try {
await this.brokerPool.refreshMetadataIfNecessary(topics)
return this.brokerPool.withBroker(async ({ broker }) => broker.metadata(topics))
} catch (e) {
if (e.type === 'LEADER_NOT_AVAILABLE') {
throw e
}
bail(e)
}
})
}
/**
* @public
* @param {string} topic

@@ -132,0 +151,0 @@ * @return {Promise}

@@ -16,3 +16,3 @@ const { EventEmitter } = require('events')

const guard = (object, method, { legalStates, async = true }) => {
const guard = (object, method, { legalStates, async: isAsync = true }) => {
if (!object[method]) {

@@ -30,3 +30,3 @@ throw new KafkaJSNonRetriableError(`Cannot add guard on missing method "${method}"`)

if (async) {
if (isAsync) {
return Promise.reject(error)

@@ -33,0 +33,0 @@ } else {

@@ -15,4 +15,4 @@ const Encoder = require('../../../encoder')

encode: async () => {
return new Encoder().writeArray(topics).writeBoolean(allowAutoTopicCreation)
return new Encoder().writeNullableArray(topics).writeBoolean(allowAutoTopicCreation)
},
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc