Comparing version 1.8.0 to 1.8.1
@@ -8,2 +8,6 @@ # Changelog | ||
## [1.8.1] - 2019-06-25 | ||
### Fixed | ||
- Make sure runner has connected brokers and fresh metadata before it starts #404 | ||
## [1.8.0] - 2019-05-13 | ||
@@ -10,0 +14,0 @@ ### Added |
{ | ||
"name": "kafkajs", | ||
"version": "1.8.0", | ||
"version": "1.8.1", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -5,0 +5,0 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", |
@@ -184,3 +184,7 @@ const BrokerPool = require('./brokerPool') | ||
// The client probably has stale metadata | ||
if (e.name === 'KafkaJSLockTimeout' || e.code === 'ECONNREFUSED') { | ||
if ( | ||
e.name === 'KafkaJSBrokerNotFound' || | ||
e.name === 'KafkaJSLockTimeout' || | ||
e.code === 'ECONNREFUSED' | ||
) { | ||
await this.refreshMetadata() | ||
@@ -187,0 +191,0 @@ } |
@@ -84,2 +84,7 @@ const flatten = require('../utils/flatten') | ||
async connect() { | ||
await this.cluster.connect() | ||
await this.cluster.refreshMetadataIfNecessary() | ||
} | ||
async join() { | ||
@@ -86,0 +91,0 @@ const { groupId, sessionTimeout, rebalanceTimeout } = this |
@@ -253,16 +253,26 @@ const Long = require('long') | ||
const coordinator = await this.getCoordinator() | ||
await coordinator.offsetCommit(payload) | ||
this.instrumentationEmitter.emit(COMMIT_OFFSETS, payload) | ||
try { | ||
const coordinator = await this.getCoordinator() | ||
await coordinator.offsetCommit(payload) | ||
this.instrumentationEmitter.emit(COMMIT_OFFSETS, payload) | ||
// Update local reference of committed offsets | ||
topics.forEach(({ topic, partitions }) => { | ||
const updatedOffsets = partitions.reduce( | ||
(obj, { partition, offset }) => assign(obj, { [partition]: offset }), | ||
{} | ||
) | ||
assign(this.committedOffsets()[topic], updatedOffsets) | ||
}) | ||
// Update local reference of committed offsets | ||
topics.forEach(({ topic, partitions }) => { | ||
const updatedOffsets = partitions.reduce( | ||
(obj, { partition, offset }) => assign(obj, { [partition]: offset }), | ||
{} | ||
) | ||
assign(this.committedOffsets()[topic], updatedOffsets) | ||
}) | ||
this.lastCommit = Date.now() | ||
this.lastCommit = Date.now() | ||
} catch (e) { | ||
// metadata is stale, the coordinator has changed due to a restart or | ||
// broker reassignment | ||
if (e.type === 'NOT_COORDINATOR_FOR_GROUP') { | ||
await this.cluster.refreshMetadata() | ||
} | ||
throw e | ||
} | ||
} | ||
@@ -269,0 +279,0 @@ |
@@ -86,2 +86,3 @@ const createRetry = require('../retry') | ||
try { | ||
await this.consumerGroup.connect() | ||
await this.join() | ||
@@ -88,0 +89,0 @@ |
436062
12983