Comparing version 3.3.5 to 3.4.0
@@ -121,3 +121,3 @@ 'use strict'; | ||
return self.client.updateMetadata().then(function () { | ||
return self.client.updateMetadata([topic]).then(function () { | ||
var s = self.subscriptions[topic + ':' + partition]; | ||
@@ -124,0 +124,0 @@ |
@@ -136,5 +136,2 @@ 'use strict'; | ||
} | ||
}) | ||
.then(function () { | ||
return self.updateMetadata(); | ||
}); | ||
@@ -226,3 +223,3 @@ }; | ||
Client.prototype.updateMetadata = function () { | ||
Client.prototype.updateMetadata = function (topicNames) { | ||
var self = this, attempt = 1; | ||
@@ -239,3 +236,3 @@ | ||
return self.metadataRequest().then(function (response) { | ||
return self.metadataRequest(topicNames).then(function (response) { | ||
var oldConnections = self.brokerConnections; | ||
@@ -335,3 +332,3 @@ | ||
.catch({ code: 'UnknownTopicOrPartition' }, function () { | ||
return self.updateMetadata().then(_try); | ||
return self.updateMetadata([topic]).then(_try); | ||
}) | ||
@@ -341,3 +338,3 @@ .then(_.values); | ||
Client.prototype.findLeader = function (topic, partition, notfoundOK) { | ||
Client.prototype.findLeader = function (topic, partition) { | ||
var self = this; | ||
@@ -348,5 +345,2 @@ | ||
if (r === -1) { | ||
if (notfoundOK) { | ||
return parseInt(_.keys(self.brokerConnections)[0]); | ||
} | ||
throw errors.byName('UnknownTopicOrPartition'); | ||
@@ -362,3 +356,3 @@ } | ||
.catch({ code: 'UnknownTopicOrPartition' }, { code: 'LeaderNotAvailable' }, function () { | ||
return self.updateMetadata().then(_try); | ||
return self.updateMetadata([topic]).then(_try); | ||
}); | ||
@@ -770,3 +764,3 @@ }; | ||
return self._waitMetadata().then(function () { | ||
return self.updateMetadata().then(function () { | ||
buffer = self.protocol.write().ListGroupsRequest({ | ||
@@ -773,0 +767,0 @@ correlationId: correlationId, |
@@ -31,2 +31,3 @@ 'use strict'; | ||
this.members = null; | ||
this.topics = []; | ||
@@ -78,2 +79,3 @@ this.strategyName = null; // current strategy assigned by group coordinator | ||
self.strategies[s.name] = s; | ||
self.topics = self.topics.concat(s.subscriptions); | ||
}); | ||
@@ -121,3 +123,3 @@ | ||
if (self.memberId === self.leaderId) { // leader should generate group assignments | ||
return self.client.updateMetadata().then(function () { | ||
return self.client.updateMetadata(self.topics).then(function () { | ||
var r = []; | ||
@@ -318,3 +320,3 @@ _.each(self.members, function (member) { | ||
return self.client.updateMetadata().then(function () { | ||
return self.client.updateMetadata(self.topics).then(function () { | ||
return self.fetchOffset(offsetRequests).map(function (p) { | ||
@@ -321,0 +323,0 @@ var options = { |
@@ -72,3 +72,8 @@ 'use strict'; | ||
.then(function () { | ||
return self.client.findLeader(d.topic, d.partition, true).then(function (leader) { | ||
return self.client.findLeader(d.topic, d.partition) | ||
.catch({ code: 'UnknownTopicOrPartition' }, function (err) { | ||
d.error = err; | ||
return -1; | ||
}) | ||
.then(function (leader) { | ||
d.leader = leader; | ||
@@ -87,2 +92,6 @@ }); | ||
function _errored(r) { | ||
return r.error !== undefined; | ||
} | ||
(function _try(_data, attempt) { | ||
@@ -92,14 +101,12 @@ attempt = attempt || 1; | ||
return self._prepareProduceRequest(_data).then(function (requests) { | ||
return self.client.produceRequest(requests, task.options.codec).then(function (response) { | ||
var toRetry = []; | ||
if (_.isEmpty(response)) { // if requiredAcks = 0 | ||
return response; | ||
} | ||
var toRetry = _.filter(requests, _errored); | ||
return self.client.produceRequest(_.reject(requests, _errored), task.options.codec).then(function (response) { | ||
return Promise.map(response, function (p) { | ||
var failed; | ||
if (p.error) { | ||
if ((/UnknownTopicOrPartition|NotLeaderForPartition|LeaderNotAvailable/.test(p.error.code) | ||
|| p.error instanceof errors.NoKafkaConnectionError) | ||
&& attempt < task.options.retries.attempts) { | ||
// self.client.debug('Received', p.error, 'for', p.topic + ':' + p.partition); | ||
toRetry = toRetry.concat(_.filter(_data, { topic: p.topic, partition: p.partition })); | ||
|| p.error instanceof errors.NoKafkaConnectionError)) { | ||
failed = _.filter(_data, { topic: p.topic, partition: p.partition }); | ||
failed = _.map(failed, function (f) { f.error = p.error; }); | ||
toRetry = toRetry.concat(failed); | ||
} else { | ||
@@ -114,10 +121,11 @@ result.push(p); | ||
var delay; | ||
if (toRetry.length) { | ||
if (toRetry.length && attempt < task.options.retries.attempts) { | ||
delay = _.min([attempt * task.options.retries.delay.min, task.options.retries.delay.max]); | ||
return Promise.delay(delay).then(function () { | ||
return self.client.updateMetadata().then(function () { | ||
return _try(toRetry, ++attempt); | ||
}); | ||
return _try(toRetry, ++attempt); | ||
}); | ||
} | ||
_.each(toRetry, function (r) { | ||
result.push({ error: r.error }); | ||
}); | ||
return null; | ||
@@ -124,0 +132,0 @@ }); |
@@ -9,3 +9,3 @@ { | ||
}, | ||
"version": "3.3.5", | ||
"version": "3.4.0", | ||
"main": "./lib/index.js", | ||
@@ -38,3 +38,3 @@ "types": "./types/index.d.ts", | ||
"sinon-chai": "^2.10.0", | ||
"snappy": "^5.0.0" | ||
"snappy": "^6.1.1" | ||
}, | ||
@@ -41,0 +41,0 @@ "bugs": { |
Sorry, the diff of this file is not supported yet
419159
5991