New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version 1.4.0 to 1.4.1

src/env.js

15

CHANGELOG.md

@@ -8,2 +8,17 @@ # Changelog

## [1.4.1] - 2018-10-17
### Fixed
- Decode multiple RecordBatch on protocol Fetch v4 #179
- Skip incomplete record batches #182
- Producer with `acks=0` never resolve #181
### Added
- Runtime flag for displaying buffers in debug output #176
- Add ZSTD to compression codecs and types #157
- Admin get topic metadata #174
### Changed
- Add description to lock instances #178
## [1.4.0] - 2018-10-09

@@ -10,0 +25,0 @@

6

package.json
{
"name": "kafkajs",
"version": "1.4.0",
"version": "1.4.1",
"description": "A modern Apache Kafka client for node.js",

@@ -25,4 +25,4 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

"scripts": {
"test:local": "export KAFKA_VERSION=${KAFKA_VERSION:='0.11'} && NODE_ENV=test echo \"KAFKA_VERSION: ${KAFKA_VERSION}\" && ./node_modules/.bin/jest --forceExit --detectOpenHandles",
"test:debug": "NODE_ENV=test node --inspect-brk node_modules/.bin/jest --detectOpenHandles --runInBand --watch",
"test:local": "export KAFKA_VERSION=${KAFKA_VERSION:='0.11'} && NODE_ENV=test echo \"KAFKA_VERSION: ${KAFKA_VERSION}\" && KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 ./node_modules/.bin/jest --forceExit --detectOpenHandles",
"test:debug": "NODE_ENV=test KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 node --inspect-brk node_modules/.bin/jest --detectOpenHandles --runInBand --watch",
"test:local:watch": "yarn test:local --watch",

@@ -29,0 +29,0 @@ "test": "yarn lint && JEST_JUNIT_OUTPUT=test-report.xml ./scripts/testWithKafka.sh 'yarn test:local --ci --maxWorkers=4 --no-watchman'",

@@ -17,2 +17,3 @@ [![KafkaJS](https://raw.githubusercontent.com/tulios/kafkajs/master/logo.png)](https://github.com/tulios/kafkajs)

- Consumer groups with pause, resume, and seek
- Message headers
- GZIP compression

@@ -57,2 +58,3 @@ - Snappy and LZ4 compression through plugins

- [Delete topics](#admin-delete-topics)
- [Get topic metadata](#admin-get-topic-metadata)
- [Fetch consumer group offsets](#admin-fetch-offsets)

@@ -67,2 +69,3 @@ - [Reset consumer group offsets](#admin-reset-offsets)

- [Development](#development)
- [Environment variables](#environment-variables)

@@ -306,6 +309,11 @@ ## <a name="installation"></a> Installation

topic: 'topic-c',
messages: [{ key: 'key', value: 'hello topic-c' }],
headers: {
'correlation-id': '2bfb68bb-893a-423b-a7fa-7b568cad5b67',
}
messages: [
{
key: 'key',
value: 'hello topic-c',
headers: {
'correlation-id': '2bfb68bb-893a-423b-a7fa-7b568cad5b67',
},
}
],
}

@@ -857,2 +865,46 @@ ]

### <a name="admin-get-topic-metadata"></a> Get topic metadata
```javascript
await admin.getTopicMetadata({ topics: <Array<String> })
```
`TopicsMetadata` structure:
```javascript
{
topics: <Array<TopicMetadata>>,
}
```
`TopicMetadata` structure:
```javascript
{
topic: <String>,
partitions: <Array<PartitionMetadata>> // default: 1
}
```
`PartitionMetadata` structure:
```javascript
{
partitionErrorCode: <Number>, // default: 0
partitionId: <Number>,
leader: <Number>,
replicas: <Array<Number>>,
isr: <Array<Number>>,
}
```
The admin client will throw an exception if any of the provided topics do not already exist.
If you omit the `topics` argument the admin client will fetch metadata for all topics
of which it is already aware (all the cluster's target topics):
```
await admin.getTopicMetadata()
```
### <a name="admin-fetch-offsets"></a> Fetch consumer group offsets

@@ -1271,2 +1323,8 @@

### <a name="environment-variables"></a> Environment variables
| variable | description | default |
| ------------------------------ | ---------------------------------------- | ------- |
| KAFKAJS_DEBUG_PROTOCOL_BUFFERS | Output raw protocol buffers in debug log | 0 |
## Acknowledgements

@@ -1273,0 +1331,0 @@

@@ -422,2 +422,59 @@ const createRetry = require('../retry')

/**
* Fetch metadata for provided topics.
*
* If no topics are provided fetch metadata for all topics of which we are aware.
* @see https://kafka.apache.org/protocol#The_Messages_Metadata
*
* @param {Object} [options]
* @param {Array<string>} [options.topics]
* @return {Promise<TopicsMetadata>}
*
* @typedef {Object} TopicsMetadata
* @property {Array<TopicMetadata>} topics
*
* @typedef {Object} TopicMetadata
* @property {String} name
* @property {Array<PartitionMetadata>} partitions
*
* @typedef {Object} PartitionMetadata
* @property {number} partitionErrorCode Response error code
* @property {number} partitionId Topic partition id
* @property {number} leader The id of the broker acting as leader for this partition.
* @property {Array<number>} replicas The set of all nodes that host this partition.
* @property {Array<number>} isr The set of nodes that are in sync with the leader for this partition.
*/
const getTopicMetadata = async options => {
const { topics } = options || {}
if (topics) {
await Promise.all(
topics.map(async topic => {
if (!topic) {
throw new KafkaJSNonRetriableError(`Invalid topic ${topic}`)
}
try {
await cluster.addTargetTopic(topic)
} catch (e) {
e.message = `Failed to add target topic ${topic}: ${e.message}`
throw e
}
})
)
}
await cluster.refreshMetadataIfNecessary()
const targetTopics = topics || [...cluster.targetTopics]
return {
topics: await Promise.all(
targetTopics.map(async topic => ({
name: topic,
partitions: await cluster.findTopicPartitionMetadata(topic),
}))
),
}
}
/**
* @param {string} eventName

@@ -452,2 +509,3 @@ * @param {Function} listener

deleteTopics,
getTopicMetadata,
events,

@@ -454,0 +512,0 @@ fetchOffsets,

@@ -41,4 +41,9 @@ const Lock = require('../utils/lock')

const lockTimeout = this.connection.connectionTimeout + this.authenticationTimeout
this.lock = new Lock({ timeout: lockTimeout })
const brokerAddress = `${this.connection.host}:${this.connection.port}`
this.lock = new Lock({
timeout: lockTimeout,
description: `connect to broker ${brokerAddress}`,
})
this.lookupRequest = () => {

@@ -45,0 +50,0 @@ throw new Error('Broker not connected')

@@ -6,2 +6,3 @@ const createRetry = require('../retry')

const { KafkaJSConnectionError } = require('../errors')
const getEnv = require('../env')

@@ -63,2 +64,3 @@ /**

this.logError = log('error')
this.shouldLogBuffers = getEnv().KAFKAJS_DEBUG_PROTOCOL_BUFFERS === '1'
}

@@ -223,2 +225,4 @@

this.failIfNotConnected()
const expectResponse = !request.expectResponse || request.expectResponse()
const requestInfo = ({ apiName, apiKey, apiVersion }) =>

@@ -235,2 +239,3 @@ `${apiName}(key: ${apiKey}, version: ${apiVersion})`

correlationId,
expectResponse,
size: Buffer.byteLength(requestPayload.buffer),

@@ -242,4 +247,17 @@ })

this.failIfNotConnected()
this.pendingQueue[correlationId] = { apiKey, apiName, apiVersion, resolve, reject }
this.socket.write(requestPayload.buffer, 'binary')
let entry = { apiKey, apiName, apiVersion }
if (expectResponse) {
entry = { ...entry, resolve, reject }
this.pendingQueue[correlationId] = entry
this.socket.write(requestPayload.buffer, 'binary')
} else {
this.socket.write(requestPayload.buffer, 'binary')
resolve({
size: 0,
payload: null,
correlationId,
entry,
})
}
} catch (e) {

@@ -253,2 +271,6 @@ reject(e)

if (!expectResponse) {
return
}
try {

@@ -274,6 +296,8 @@ const payloadDecoded = await response.decode(payload)

const isBuffer = Buffer.isBuffer(payload)
this.logDebug(`Response ${requestInfo(entry)}`, {
error: e.message,
correlationId,
payload,
payload:
isBuffer && !this.shouldLogBuffers ? { type: 'Buffer', data: '[filtered]' } : payload,
})

@@ -280,0 +304,0 @@

@@ -5,5 +5,6 @@ const flatten = require('../utils/flatten')

const partitions = topics.map(({ topicName, partitions }) =>
partitions.map(partition => Object.assign({ topicName }, partition))
partitions.map(partition => ({ topicName, ...partition }))
)
return flatten(partitions)
}

@@ -80,3 +80,4 @@ const createRetry = require('../retry')

const response = await broker.produce({ acks, timeout, compression, topicData })
responsePerBroker.set(broker, responseSerializer(response))
const expectResponse = acks !== 0
responsePerBroker.set(broker, expectResponse ? responseSerializer(response) : [])
} catch (e) {

@@ -83,0 +84,0 @@ responsePerBroker.delete(broker)

@@ -11,2 +11,3 @@ const { KafkaJSNotImplemented } = require('../../../errors')

LZ4: 3,
ZSTD: 4,
}

@@ -22,2 +23,5 @@

},
[Types.ZSTD]: () => {
throw new KafkaJSNotImplemented('ZSTD compression not implemented')
},
}

@@ -24,0 +28,0 @@

const Decoder = require('../../decoder')
const { KafkaJSPartialMessageError } = require('../../../errors')
const { lookupCodecByRecordBatchAttributes } = require('../../message/compression')
const { KafkaJSPartialMessageError } = require('../../../errors')
const RecordDecoder = require('../record/v0/decoder')

@@ -27,7 +27,9 @@

module.exports = async decoder => {
const firstOffset = decoder.readInt64().toString()
const length = decoder.readInt32()
module.exports = async fetchDecoder => {
const firstOffset = fetchDecoder.readInt64().toString()
const length = fetchDecoder.readInt32()
const decoder = fetchDecoder.slice(length)
fetchDecoder.forward(length)
const remainingBytes = Buffer.byteLength(decoder.slice(length).buffer)
const remainingBytes = Buffer.byteLength(decoder.buffer)

@@ -62,6 +64,4 @@ if (remainingBytes < length) {

const recordsSize = Buffer.byteLength(decoder.buffer)
const recordsDecoder = decoder.slice(recordsSize)
const recordContext = { firstOffset, firstTimestamp, magicByte }
const records = await decodeRecords(codec, recordsDecoder, recordContext)
const records = await decodeRecords(codec, decoder, recordContext)

@@ -90,3 +90,3 @@ return {

if (length === -1) {
if (length <= 0) {
return []

@@ -93,0 +93,0 @@ }

@@ -10,2 +10,3 @@ const Decoder = require('../../../decoder')

const MAGIC_OFFSET = 16
const RECORD_BATCH_OVERHEAD = 49

@@ -41,4 +42,18 @@ /**

if (magicByte === MAGIC_BYTE) {
const recordBatch = await RecordBatchDecoder(messagesDecoder)
return recordBatch.records
let records = []
while (messagesDecoder.canReadBytes(RECORD_BATCH_OVERHEAD)) {
try {
const recordBatch = await RecordBatchDecoder(messagesDecoder)
records = [...records, ...recordBatch.records]
} catch (e) {
// The tail of the record batches can have incomplete records
// due to how maxBytes works. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI
if (e.name === 'KafkaJSPartialMessageError') {
break
}
}
}
return records
}

@@ -45,0 +60,0 @@

@@ -67,2 +67,3 @@ const Encoder = require('../../../encoder')

apiName: 'Produce',
expectResponse: () => acks !== 0,
encode: async () => {

@@ -69,0 +70,0 @@ return new Encoder()

@@ -13,2 +13,3 @@ const Encoder = require('../../../encoder')

apiName: 'Produce',
expectResponse: () => acks !== 0,
encode: async () => {

@@ -15,0 +16,0 @@ const encodeTopic = topicEncoder(compression)

@@ -36,2 +36,3 @@ const Encoder = require('../../../encoder')

apiName: 'Produce',
expectResponse: () => acks !== 0,
encode: async () => {

@@ -38,0 +39,0 @@ const encodeTopic = topicEncoder(compression)

@@ -0,1 +1,2 @@

const { format } = require('util')
const { KafkaJSLockTimeout } = require('../errors')

@@ -7,9 +8,16 @@

WAITING: Symbol('private:Lock:waiting'),
TIMEOUT_ERROR_MESSAGE: Symbol('private:Lock:timeoutErrorMessage'),
}
const TIMEOUT_MESSAGE = 'Timeout while acquiring lock (%d waiting locks)'
module.exports = class Lock {
constructor({ timeout = 1000 } = {}) {
constructor({ timeout = 1000, description = null } = {}) {
this[PRIVATE.LOCKED] = false
this[PRIVATE.TIMEOUT] = timeout
this[PRIVATE.WAITING] = new Set()
this[PRIVATE.TIMEOUT_ERROR_MESSAGE] = () => {
const timeoutMessage = format(TIMEOUT_MESSAGE, this[PRIVATE.WAITING].size)
return description ? `${timeoutMessage}: "${description}"` : timeoutMessage
}
}

@@ -36,3 +44,3 @@

timeoutId = setTimeout(
() => reject(new KafkaJSLockTimeout('Timeout while acquiring lock')),
() => reject(new KafkaJSLockTimeout(this[PRIVATE.TIMEOUT_ERROR_MESSAGE]())),
this[PRIVATE.TIMEOUT]

@@ -39,0 +47,0 @@ )

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc