Comparing version
{ | ||
"name": "kafkajs", | ||
"version": "1.13.0-beta.50", | ||
"version": "1.13.0-beta.51", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -82,5 +82,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"kafkajs": { | ||
"sha": "af684ba10ccecec7c23e812cdb742090971ba74b", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...af684ba10ccecec7c23e812cdb742090971ba74b" | ||
"sha": "6b2d7235c84256a28149bc25c0e69d853b8f14b5", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...6b2d7235c84256a28149bc25c0e69d853b8f14b5" | ||
} | ||
} |
@@ -57,10 +57,43 @@ const Long = require('long') | ||
constructor() { | ||
this.buffer = Buffer.alloc(0) | ||
static nextPowerOfTwo(value) { | ||
return 1 << (31 - Math.clz32(value) + 1) | ||
} | ||
/** | ||
* Construct a new encoder with the given initial size | ||
* | ||
* @param {number} [initialSize] initial size | ||
*/ | ||
constructor(initialSize = 511) { | ||
this.buf = Buffer.alloc(Encoder.nextPowerOfTwo(initialSize)) | ||
this.offset = 0 | ||
} | ||
/** | ||
* @param {Buffer} buffer | ||
*/ | ||
writeBufferInternal(buffer) { | ||
const bufferLength = buffer.length | ||
this.ensureAvailable(bufferLength) | ||
buffer.copy(this.buf, this.offset, 0) | ||
this.offset += bufferLength | ||
} | ||
ensureAvailable(length) { | ||
if (this.offset + length > this.buf.length) { | ||
const newLength = Encoder.nextPowerOfTwo(this.offset + length) | ||
const newBuffer = Buffer.alloc(newLength) | ||
this.buf.copy(newBuffer, 0, 0, this.offset) | ||
this.buf = newBuffer | ||
} | ||
} | ||
get buffer() { | ||
return this.buf.slice(0, this.offset) | ||
} | ||
writeInt8(value) { | ||
const tempBuffer = Buffer.alloc(INT8_SIZE) | ||
tempBuffer.writeInt8(value) | ||
this.buffer = Buffer.concat([this.buffer, tempBuffer]) | ||
this.ensureAvailable(INT8_SIZE) | ||
this.buf.writeInt8(value, this.offset) | ||
this.offset += INT8_SIZE | ||
return this | ||
@@ -70,5 +103,5 @@ } | ||
writeInt16(value) { | ||
const tempBuffer = Buffer.alloc(INT16_SIZE) | ||
tempBuffer.writeInt16BE(value) | ||
this.buffer = Buffer.concat([this.buffer, tempBuffer]) | ||
this.ensureAvailable(INT16_SIZE) | ||
this.buf.writeInt16BE(value, this.offset) | ||
this.offset += INT16_SIZE | ||
return this | ||
@@ -78,5 +111,5 @@ } | ||
writeInt32(value) { | ||
const tempBuffer = Buffer.alloc(INT32_SIZE) | ||
tempBuffer.writeInt32BE(value) | ||
this.buffer = Buffer.concat([this.buffer, tempBuffer]) | ||
this.ensureAvailable(INT32_SIZE) | ||
this.buf.writeInt32BE(value, this.offset) | ||
this.offset += INT32_SIZE | ||
return this | ||
@@ -86,5 +119,5 @@ } | ||
writeUInt32(value) { | ||
const tempBuffer = Buffer.alloc(INT32_SIZE) | ||
tempBuffer.writeUInt32BE(value) | ||
this.buffer = Buffer.concat([this.buffer, tempBuffer]) | ||
this.ensureAvailable(INT32_SIZE) | ||
this.buf.writeUInt32BE(value, this.offset) | ||
this.offset += INT32_SIZE | ||
return this | ||
@@ -94,7 +127,7 @@ } | ||
writeInt64(value) { | ||
const tempBuffer = Buffer.alloc(INT64_SIZE) | ||
this.ensureAvailable(INT64_SIZE) | ||
const longValue = Long.fromValue(value) | ||
tempBuffer.writeInt32BE(longValue.getHighBits(), 0) | ||
tempBuffer.writeInt32BE(longValue.getLowBits(), 4) | ||
this.buffer = Buffer.concat([this.buffer, tempBuffer]) | ||
this.buf.writeInt32BE(longValue.getHighBits(), this.offset) | ||
this.buf.writeInt32BE(longValue.getLowBits(), this.offset + INT32_SIZE) | ||
this.offset += INT64_SIZE | ||
return this | ||
@@ -115,6 +148,6 @@ } | ||
const byteLength = Buffer.byteLength(value, 'utf8') | ||
this.ensureAvailable(INT16_SIZE + byteLength) | ||
this.writeInt16(byteLength) | ||
const tempBuffer = Buffer.alloc(byteLength) | ||
tempBuffer.write(value, 0, byteLength, 'utf8') | ||
this.buffer = Buffer.concat([this.buffer, tempBuffer]) | ||
this.buf.write(value, this.offset, byteLength, 'utf8') | ||
this.offset += byteLength | ||
return this | ||
@@ -131,5 +164,5 @@ } | ||
this.writeVarInt(byteLength) | ||
const tempBuffer = Buffer.alloc(byteLength) | ||
tempBuffer.write(value, 0, byteLength, 'utf8') | ||
this.buffer = Buffer.concat([this.buffer, tempBuffer]) | ||
this.ensureAvailable(byteLength) | ||
this.buf.write(value, this.offset, byteLength, 'utf8') | ||
this.offset += byteLength | ||
return this | ||
@@ -146,11 +179,12 @@ } | ||
// raw bytes | ||
this.ensureAvailable(INT32_SIZE + value.length) | ||
this.writeInt32(value.length) | ||
this.buffer = Buffer.concat([this.buffer, value]) | ||
this.writeBufferInternal(value) | ||
} else { | ||
const valueToWrite = String(value) | ||
const byteLength = Buffer.byteLength(valueToWrite, 'utf8') | ||
this.ensureAvailable(INT32_SIZE + byteLength) | ||
this.writeInt32(byteLength) | ||
const tempBuffer = Buffer.alloc(byteLength) | ||
tempBuffer.write(valueToWrite, 0, byteLength, 'utf8') | ||
this.buffer = Buffer.concat([this.buffer, tempBuffer]) | ||
this.buf.write(valueToWrite, this.offset, byteLength, 'utf8') | ||
this.offset += byteLength | ||
} | ||
@@ -170,3 +204,3 @@ | ||
this.writeVarInt(value.length) | ||
this.buffer = Buffer.concat([this.buffer, value]) | ||
this.writeBufferInternal(value) | ||
} else { | ||
@@ -176,5 +210,5 @@ const valueToWrite = String(value) | ||
this.writeVarInt(byteLength) | ||
const tempBuffer = Buffer.alloc(byteLength) | ||
tempBuffer.write(valueToWrite, 0, byteLength, 'utf8') | ||
this.buffer = Buffer.concat([this.buffer, tempBuffer]) | ||
this.ensureAvailable(byteLength) | ||
this.buf.write(valueToWrite, this.offset, byteLength, 'utf8') | ||
this.offset += byteLength | ||
} | ||
@@ -186,19 +220,18 @@ | ||
writeEncoder(value) { | ||
if (value == null || value.buffer == null) { | ||
if (value == null || !Buffer.isBuffer(value.buf)) { | ||
throw new Error('value should be an instance of Encoder') | ||
} | ||
return this.writeBuffer(value.buffer) | ||
this.writeBufferInternal(value.buffer) | ||
return this | ||
} | ||
writeEncoderArray(value) { | ||
if (!Array.isArray(value) || value.some(v => v == null || !Buffer.isBuffer(v.buffer))) { | ||
if (!Array.isArray(value) || value.some(v => v == null || !Buffer.isBuffer(v.buf))) { | ||
throw new Error('all values should be an instance of Encoder[]') | ||
} | ||
const newBuffer = [this.buffer] | ||
value.forEach(v => { | ||
newBuffer.push(v.buffer) | ||
this.writeBufferInternal(v.buffer) | ||
}) | ||
this.buffer = Buffer.concat(newBuffer) | ||
return this | ||
@@ -212,3 +245,3 @@ } | ||
this.buffer = Buffer.concat([this.buffer, value]) | ||
this.writeBufferInternal(value) | ||
return this | ||
@@ -225,3 +258,4 @@ } | ||
const length = array.length !== 0 ? array.length : -1 | ||
return this.writeArray(array, type, length) | ||
this.writeArray(array, type, length) | ||
return this | ||
} | ||
@@ -292,3 +326,3 @@ | ||
byteArray.push(encodedValue & OTHER_BITS) | ||
this.buffer = Buffer.concat([this.buffer, Buffer.from(byteArray)]) | ||
this.writeBufferInternal(Buffer.from(byteArray)) | ||
return this | ||
@@ -313,3 +347,3 @@ } | ||
this.buffer = Buffer.concat([this.buffer, Buffer.from(byteArray)]) | ||
this.writeBufferInternal(Buffer.from(byteArray)) | ||
return this | ||
@@ -319,3 +353,4 @@ } | ||
size() { | ||
return Buffer.byteLength(this.buffer) | ||
// We can use the offset here directly, because we anyways will not re-encode the buffer when writing | ||
return this.offset | ||
} | ||
@@ -322,0 +357,0 @@ |
521235
0.12%15430
0.2%