Comparing version 1.12.0-beta.11 to 1.12.0-beta.12
{ | ||
"name": "kafkajs", | ||
"version": "1.12.0-beta.11", | ||
"version": "1.12.0-beta.12", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -26,3 +26,3 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"scripts": { | ||
"jest": "export KAFKA_VERSION=${KAFKA_VERSION:='2.2'} && NODE_ENV=test echo \"KAFKA_VERSION: ${KAFKA_VERSION}\" && KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 ./node_modules/.bin/jest", | ||
"jest": "export KAFKA_VERSION=${KAFKA_VERSION:='2.3'} && NODE_ENV=test echo \"KAFKA_VERSION: ${KAFKA_VERSION}\" && KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 ./node_modules/.bin/jest", | ||
"test:local": "yarn jest --detectOpenHandles", | ||
@@ -82,5 +82,5 @@ "test:debug": "NODE_ENV=test KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 node --inspect-brk node_modules/.bin/jest --detectOpenHandles --runInBand --watch", | ||
"kafkajs": { | ||
"sha": "6002347daf0a07353347664c505a536c79538721", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.11.0...6002347daf0a07353347664c505a536c79538721" | ||
"sha": "b15e0f15325553e60182ee733e196671e5b97b72", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.11.0...b15e0f15325553e60182ee733e196671e5b97b72" | ||
} | ||
} |
@@ -11,4 +11,5 @@ const Long = require('long') | ||
constructor(topic, fetchedOffset, partitionData) { | ||
const longFetchedOffset = Long.fromValue(fetchedOffset) | ||
const { abortedTransactions } = partitionData | ||
this.fetchedOffset = fetchedOffset | ||
const longFetchedOffset = Long.fromValue(this.fetchedOffset) | ||
const { abortedTransactions, messages } = partitionData | ||
@@ -19,9 +20,9 @@ this.topic = topic | ||
this.rawMessages = messages | ||
// Apparently fetch can return different offsets than the target offset provided to the fetch API. | ||
// Discard messages that are not in the requested offset | ||
// https://github.com/apache/kafka/blob/bf237fa7c576bd141d78fdea9f17f65ea269c290/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L912 | ||
const messagesWithinOffset = partitionData.messages.filter(message => | ||
this.messagesWithinOffset = this.rawMessages.filter(message => | ||
Long.fromValue(message.offset).gte(longFetchedOffset) | ||
) | ||
this.unfilteredMessages = messagesWithinOffset | ||
@@ -32,3 +33,3 @@ // 1. Don't expose aborted messages | ||
this.messages = filterAbortedMessages({ | ||
messages: messagesWithinOffset, | ||
messages: this.messagesWithinOffset, | ||
abortedTransactions, | ||
@@ -43,19 +44,42 @@ }).filter(message => !message.isControlRecord) | ||
isEmptyIncludingFiltered() { | ||
return this.unfilteredMessages.length === 0 | ||
return this.messagesWithinOffset.length === 0 | ||
} | ||
isEmptyControlRecord() { | ||
return this.isEmpty() && this.unfilteredMessages.some(({ isControlRecord }) => isControlRecord) | ||
return ( | ||
this.isEmpty() && this.messagesWithinOffset.some(({ isControlRecord }) => isControlRecord) | ||
) | ||
} | ||
/** | ||
* With compressed messages, it's possible for the returned messages to have offsets smaller than the starting offset. | ||
* These messages will be filtered out (i.e. they are not even included in this.messagesWithinOffset) | ||
* If these are the only messages, the batch will appear as an empty batch. | ||
* | ||
* isEmpty() and isEmptyIncludingFiltered() will always return true if the batch is empty, | ||
* but this method will only return true if the batch is empty due to log compacted messages. | ||
* | ||
* @returns boolean True if the batch is empty, because of log compacted messages in the partition. | ||
*/ | ||
isEmptyDueToLogCompactedMessages() { | ||
const hasMessages = this.rawMessages.length > 0 | ||
return hasMessages && this.isEmptyIncludingFiltered() | ||
} | ||
firstOffset() { | ||
return this.isEmptyIncludingFiltered() ? null : this.unfilteredMessages[0].offset | ||
return this.isEmptyIncludingFiltered() ? null : this.messagesWithinOffset[0].offset | ||
} | ||
lastOffset() { | ||
return this.isEmptyIncludingFiltered() | ||
? Long.fromValue(this.highWatermark) | ||
.add(-1) | ||
.toString() | ||
: this.unfilteredMessages[this.unfilteredMessages.length - 1].offset | ||
if (this.isEmptyDueToLogCompactedMessages()) { | ||
return this.fetchedOffset | ||
} | ||
if (this.isEmptyIncludingFiltered()) { | ||
return Long.fromValue(this.highWatermark) | ||
.add(-1) | ||
.toString() | ||
} | ||
return this.messagesWithinOffset[this.messagesWithinOffset.length - 1].offset | ||
} | ||
@@ -67,6 +91,2 @@ | ||
offsetLag() { | ||
if (this.isEmptyIncludingFiltered()) { | ||
return '0' | ||
} | ||
const lastOffsetOfPartition = Long.fromValue(this.highWatermark).add(-1) | ||
@@ -73,0 +93,0 @@ const lastConsumedOffset = Long.fromValue(this.lastOffset()) |
@@ -440,3 +440,3 @@ const flatten = require('../utils/flatten') | ||
*/ | ||
if (batch.isEmptyControlRecord()) { | ||
if (batch.isEmptyControlRecord() || batch.isEmptyDueToLogCompactedMessages()) { | ||
this.resolveOffset({ | ||
@@ -443,0 +443,0 @@ topic: batch.topic, |
481143
14198