Comparing version
{ | ||
"name": "kafkajs", | ||
"version": "1.13.0-beta.11", | ||
"version": "1.13.0-beta.12", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -82,5 +82,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"kafkajs": { | ||
"sha": "2def29541d3b0b55df3cbbd71ac788cd4048e77b", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...2def29541d3b0b55df3cbbd71ac788cd4048e77b" | ||
"sha": "9951162b69a710ed98a8967ca748eb0aa8f3af01", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...9951162b69a710ed98a8967ca748eb0aa8f3af01" | ||
} | ||
} |
@@ -67,3 +67,6 @@ const createRetry = require('../retry') | ||
this.buffer = Buffer.alloc(0) | ||
this.bytesBuffered = 0 | ||
this.bytesNeeded = Decoder.int32Size() | ||
this.chunks = [] | ||
this.connected = false | ||
@@ -375,12 +378,18 @@ this.correlationId = 0 | ||
this.buffer = Buffer.concat([this.buffer, rawData]) | ||
// Accumulate the new chunk | ||
this.chunks.push(rawData) | ||
this.bytesBuffered += Buffer.byteLength(rawData) | ||
// Process data if there are enough bytes to read the expected response size, | ||
// otherwise keep buffering | ||
while (Buffer.byteLength(this.buffer) > Decoder.int32Size()) { | ||
const data = Buffer.from(this.buffer) | ||
const decoder = new Decoder(data) | ||
while (this.bytesNeeded <= this.bytesBuffered) { | ||
const buffer = this.chunks.length > 1 ? Buffer.concat(this.chunks) : this.chunks[0] | ||
const decoder = new Decoder(buffer) | ||
const expectedResponseSize = decoder.readInt32() | ||
// Return early if not enough bytes to read the full response | ||
if (!decoder.canReadBytes(expectedResponseSize)) { | ||
this.chunks = [buffer] | ||
this.bytesBuffered = Buffer.byteLength(buffer) | ||
this.bytesNeeded = Decoder.int32Size() + expectedResponseSize | ||
return | ||
@@ -390,8 +399,12 @@ } | ||
const response = new Decoder(decoder.readBytes(expectedResponseSize)) | ||
// Reset the buffer as the rest of the bytes | ||
this.buffer = decoder.readAll() | ||
// Reset the buffered chunks as the rest of the bytes | ||
const remainderBuffer = decoder.readAll() | ||
this.chunks = [remainderBuffer] | ||
this.bytesBuffered = Buffer.byteLength(remainderBuffer) | ||
this.bytesNeeded = Decoder.int32Size() | ||
if (this.authHandlers) { | ||
const rawResponseSize = Decoder.int32Size() + expectedResponseSize | ||
const rawResponseBuffer = data.slice(0, rawResponseSize) | ||
const rawResponseBuffer = buffer.slice(0, rawResponseSize) | ||
return this.authHandlers.onSuccess(rawResponseBuffer) | ||
@@ -398,0 +411,0 @@ } |
506425
0.11%14967
0.07%