Comparing version
{ | ||
"name": "kafkajs", | ||
"version": "1.13.0-beta.9", | ||
"version": "1.13.0-beta.10", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -82,5 +82,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"kafkajs": { | ||
"sha": "03409e2af6d8622b5e65c21c63a750cb938a8587", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...03409e2af6d8622b5e65c21c63a750cb938a8587" | ||
"sha": "897715fcc812820284c5ef3d76bd07bf4fac1478", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...897715fcc812820284c5ef3d76bd07bf4fac1478" | ||
} | ||
} |
const BrokerPool = require('./brokerPool') | ||
const Lock = require('../utils/lock') | ||
const createRetry = require('../retry') | ||
@@ -81,2 +82,6 @@ const connectionBuilder = require('./connectionBuilder') | ||
this.targetTopics = new Set() | ||
this.mutatingTargetTopics = new Lock({ | ||
description: `updating target topics`, | ||
timeout: requestTimeout, | ||
}) | ||
this.isolationLevel = isolationLevel | ||
@@ -166,11 +171,26 @@ this.brokerPool = new BrokerPool({ | ||
async addMultipleTargetTopics(topics) { | ||
const previousSize = this.targetTopics.size | ||
for (const topic of topics) { | ||
this.targetTopics.add(topic) | ||
} | ||
await this.mutatingTargetTopics.acquire() | ||
const hasChanged = previousSize !== this.targetTopics.size || !this.brokerPool.metadata | ||
try { | ||
const previousSize = this.targetTopics.size | ||
const previousTopics = new Set(this.targetTopics) | ||
for (const topic of topics) { | ||
this.targetTopics.add(topic) | ||
} | ||
if (hasChanged) { | ||
await this.refreshMetadata() | ||
const hasChanged = previousSize !== this.targetTopics.size || !this.brokerPool.metadata | ||
if (hasChanged) { | ||
try { | ||
await this.refreshMetadata() | ||
} catch (e) { | ||
if (e.type === 'INVALID_TOPIC_EXCEPTION') { | ||
this.targetTopics = previousTopics | ||
} | ||
throw e | ||
} | ||
} | ||
} finally { | ||
await this.mutatingTargetTopics.release() | ||
} | ||
@@ -177,0 +197,0 @@ } |
505833
0.11%14954
0.12%