Comparing version 1.6.0 to 1.7.0
@@ -8,2 +8,13 @@ # Changelog | ||
## [1.7.0] - 2019-04-12 | ||
### Fixed | ||
- Improve compatibility with terserjs #338 | ||
### Added | ||
- Add `admin#fetchTopicMetadata` #331 | ||
### Changed | ||
- Deprecated `admin#getTopicMetadata` #331 | ||
- `admin#fetchTopicOffsets` returns the low and high watermarks #333 | ||
## [1.6.0] - 2019-04-01 | ||
@@ -10,0 +21,0 @@ ### Added |
{ | ||
"name": "kafkajs", | ||
"version": "1.6.0", | ||
"version": "1.7.0", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -62,2 +62,3 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"prettier": "^1.15.2", | ||
"semver": "^6.0.0", | ||
"uuid": "^3.3.2" | ||
@@ -64,0 +65,0 @@ }, |
@@ -155,3 +155,3 @@ const createRetry = require('../retry') | ||
} catch (e) { | ||
if (e.type === 'NOT_CONTROLLER') { | ||
if (['NOT_CONTROLLER', 'UNKNOWN_TOPIC_OR_PARTITION'].includes(e.type)) { | ||
logger.warn('Could not delete topics', { error: e.message, retryCount, retryTime }) | ||
@@ -194,5 +194,6 @@ throw e | ||
const metadata = cluster.findTopicPartitionMetadata(topic) | ||
const offsets = await cluster.fetchTopicsOffset([ | ||
const high = await cluster.fetchTopicsOffset([ | ||
{ | ||
topic, | ||
fromBeginning: false, | ||
partitions: metadata.map(p => ({ partition: p.partitionId })), | ||
@@ -202,4 +203,19 @@ }, | ||
const { partitions } = offsets.pop() | ||
return partitions.map(({ partition, offset }) => ({ partition, offset })) | ||
const low = await cluster.fetchTopicsOffset([ | ||
{ | ||
topic, | ||
fromBeginning: true, | ||
partitions: metadata.map(p => ({ partition: p.partitionId })), | ||
}, | ||
]) | ||
const { partitions: highPartitions } = high.pop() | ||
const { partitions: lowPartitions } = low.pop() | ||
return highPartitions.map(({ partition, offset }) => ({ | ||
partition, | ||
offset, | ||
high: offset, | ||
low: lowPartitions.find(({ partition: lowPartition }) => lowPartition === partition) | ||
.offset, | ||
})) | ||
} catch (e) { | ||
@@ -475,2 +491,6 @@ if (e.type === 'UNKNOWN_TOPIC_OR_PARTITION') { | ||
/** | ||
* @deprecated - This method was replaced by `fetchTopicMetadata`. This implementation | ||
* is limited by the topics in the target group, so it can't fetch all topics when | ||
* necessary. | ||
* | ||
* Fetch metadata for provided topics. | ||
@@ -533,2 +553,47 @@ * | ||
/** | ||
* Fetch metadata for provided topics. | ||
* | ||
* If no topics are provided fetch metadata for all topics. | ||
* @see https://kafka.apache.org/protocol#The_Messages_Metadata | ||
* | ||
* @param {Object} [options] | ||
* @param {string[]} [options.topics] | ||
* @return {Promise<TopicsMetadata>} | ||
* | ||
* @typedef {Object} TopicsMetadata | ||
* @property {Array<TopicMetadata>} topics | ||
* | ||
* @typedef {Object} TopicMetadata | ||
* @property {String} name | ||
* @property {Array<PartitionMetadata>} partitions | ||
* | ||
* @typedef {Object} PartitionMetadata | ||
* @property {number} partitionErrorCode Response error code | ||
* @property {number} partitionId Topic partition id | ||
* @property {number} leader The id of the broker acting as leader for this partition. | ||
* @property {Array<number>} replicas The set of all nodes that host this partition. | ||
* @property {Array<number>} isr The set of nodes that are in sync with the leader for this partition. | ||
*/ | ||
const fetchTopicMetadata = async ({ topics = [] } = {}) => { | ||
if (topics) { | ||
await Promise.all( | ||
topics.map(async topic => { | ||
if (!topic) { | ||
throw new KafkaJSNonRetriableError(`Invalid topic ${topic}`) | ||
} | ||
}) | ||
) | ||
} | ||
const metadata = await cluster.metadata({ topics }) | ||
return { | ||
topics: metadata.topicMetadata.map(topicMetadata => ({ | ||
name: topicMetadata.topic, | ||
partitions: topicMetadata.partitionMetadata, | ||
})), | ||
} | ||
} | ||
/** | ||
* @param {string} eventName | ||
@@ -565,2 +630,3 @@ * @param {Function} listener | ||
getTopicMetadata, | ||
fetchTopicMetadata, | ||
events, | ||
@@ -567,0 +633,0 @@ fetchOffsets, |
@@ -130,2 +130,21 @@ const BrokerPool = require('./brokerPool') | ||
* @public | ||
* @returns {Promise<Metadata>} | ||
*/ | ||
async metadata({ topics = [] } = {}) { | ||
return this.retrier(async (bail, retryCount, retryTime) => { | ||
try { | ||
await this.brokerPool.refreshMetadataIfNecessary(topics) | ||
return this.brokerPool.withBroker(async ({ broker }) => broker.metadata(topics)) | ||
} catch (e) { | ||
if (e.type === 'LEADER_NOT_AVAILABLE') { | ||
throw e | ||
} | ||
bail(e) | ||
} | ||
}) | ||
} | ||
/** | ||
* @public | ||
* @param {string} topic | ||
@@ -132,0 +151,0 @@ * @return {Promise} |
@@ -16,3 +16,3 @@ const { EventEmitter } = require('events') | ||
const guard = (object, method, { legalStates, async = true }) => { | ||
const guard = (object, method, { legalStates, async: isAsync = true }) => { | ||
if (!object[method]) { | ||
@@ -30,3 +30,3 @@ throw new KafkaJSNonRetriableError(`Cannot add guard on missing method "${method}"`) | ||
if (async) { | ||
if (isAsync) { | ||
return Promise.reject(error) | ||
@@ -33,0 +33,0 @@ } else { |
@@ -15,4 +15,4 @@ const Encoder = require('../../../encoder') | ||
encode: async () => { | ||
return new Encoder().writeArray(topics).writeBoolean(allowAutoTopicCreation) | ||
return new Encoder().writeNullableArray(topics).writeBoolean(allowAutoTopicCreation) | ||
}, | ||
}) |
391141
11557
20