New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version 1.12.0-beta.11 to 1.12.0-beta.12

8

package.json
{
"name": "kafkajs",
"version": "1.12.0-beta.11",
"version": "1.12.0-beta.12",
"description": "A modern Apache Kafka client for node.js",

@@ -26,3 +26,3 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

"scripts": {
"jest": "export KAFKA_VERSION=${KAFKA_VERSION:='2.2'} && NODE_ENV=test echo \"KAFKA_VERSION: ${KAFKA_VERSION}\" && KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 ./node_modules/.bin/jest",
"jest": "export KAFKA_VERSION=${KAFKA_VERSION:='2.3'} && NODE_ENV=test echo \"KAFKA_VERSION: ${KAFKA_VERSION}\" && KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 ./node_modules/.bin/jest",
"test:local": "yarn jest --detectOpenHandles",

@@ -82,5 +82,5 @@ "test:debug": "NODE_ENV=test KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 node --inspect-brk node_modules/.bin/jest --detectOpenHandles --runInBand --watch",

"kafkajs": {
"sha": "6002347daf0a07353347664c505a536c79538721",
"compare": "https://github.com/tulios/kafkajs/compare/v1.11.0...6002347daf0a07353347664c505a536c79538721"
"sha": "b15e0f15325553e60182ee733e196671e5b97b72",
"compare": "https://github.com/tulios/kafkajs/compare/v1.11.0...b15e0f15325553e60182ee733e196671e5b97b72"
}
}

@@ -11,4 +11,5 @@ const Long = require('long')

constructor(topic, fetchedOffset, partitionData) {
const longFetchedOffset = Long.fromValue(fetchedOffset)
const { abortedTransactions } = partitionData
this.fetchedOffset = fetchedOffset
const longFetchedOffset = Long.fromValue(this.fetchedOffset)
const { abortedTransactions, messages } = partitionData

@@ -19,9 +20,9 @@ this.topic = topic

this.rawMessages = messages
// Apparently fetch can return different offsets than the target offset provided to the fetch API.
// Discard messages that are not in the requested offset
// https://github.com/apache/kafka/blob/bf237fa7c576bd141d78fdea9f17f65ea269c290/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L912
const messagesWithinOffset = partitionData.messages.filter(message =>
this.messagesWithinOffset = this.rawMessages.filter(message =>
Long.fromValue(message.offset).gte(longFetchedOffset)
)
this.unfilteredMessages = messagesWithinOffset

@@ -32,3 +33,3 @@ // 1. Don't expose aborted messages

this.messages = filterAbortedMessages({
messages: messagesWithinOffset,
messages: this.messagesWithinOffset,
abortedTransactions,

@@ -43,19 +44,42 @@ }).filter(message => !message.isControlRecord)

isEmptyIncludingFiltered() {
return this.unfilteredMessages.length === 0
return this.messagesWithinOffset.length === 0
}
isEmptyControlRecord() {
return this.isEmpty() && this.unfilteredMessages.some(({ isControlRecord }) => isControlRecord)
return (
this.isEmpty() && this.messagesWithinOffset.some(({ isControlRecord }) => isControlRecord)
)
}
/**
* With compressed messages, it's possible for the returned messages to have offsets smaller than the starting offset.
* These messages will be filtered out (i.e. they are not even included in this.messagesWithinOffset)
* If these are the only messages, the batch will appear as an empty batch.
*
* isEmpty() and isEmptyIncludingFiltered() will always return true if the batch is empty,
* but this method will only return true if the batch is empty due to log compacted messages.
*
* @returns boolean True if the batch is empty, because of log compacted messages in the partition.
*/
isEmptyDueToLogCompactedMessages() {
const hasMessages = this.rawMessages.length > 0
return hasMessages && this.isEmptyIncludingFiltered()
}
firstOffset() {
return this.isEmptyIncludingFiltered() ? null : this.unfilteredMessages[0].offset
return this.isEmptyIncludingFiltered() ? null : this.messagesWithinOffset[0].offset
}
lastOffset() {
return this.isEmptyIncludingFiltered()
? Long.fromValue(this.highWatermark)
.add(-1)
.toString()
: this.unfilteredMessages[this.unfilteredMessages.length - 1].offset
if (this.isEmptyDueToLogCompactedMessages()) {
return this.fetchedOffset
}
if (this.isEmptyIncludingFiltered()) {
return Long.fromValue(this.highWatermark)
.add(-1)
.toString()
}
return this.messagesWithinOffset[this.messagesWithinOffset.length - 1].offset
}

@@ -67,6 +91,2 @@

offsetLag() {
if (this.isEmptyIncludingFiltered()) {
return '0'
}
const lastOffsetOfPartition = Long.fromValue(this.highWatermark).add(-1)

@@ -73,0 +93,0 @@ const lastConsumedOffset = Long.fromValue(this.lastOffset())

@@ -440,3 +440,3 @@ const flatten = require('../utils/flatten')

*/
if (batch.isEmptyControlRecord()) {
if (batch.isEmptyControlRecord() || batch.isEmptyDueToLogCompactedMessages()) {
this.resolveOffset({

@@ -443,0 +443,0 @@ topic: batch.topic,

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc