New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version 1.3.0 to 1.3.1

src/admin/instrumentationEvents.js

12

CHANGELOG.md

@@ -8,2 +8,12 @@ # Changelog

## [1.3.1] - 2018-08-20
### Fixed
- Client logger accessor #106
- Producer v3 decode format #114
- Parsing multiple responses #115
- Fetch v4 for partial messages on record batch #116
### Added
- Connection instrumentation events #110
## [1.3.0] - 2018-08-06

@@ -18,3 +28,3 @@ ### Fixed

- Notify user when setting heartbeat interval to same or higher than session timeout #91
- Constantly refresh metatada based on `metadataMaxAge` #94
- Constantly refresh metadata based on `metadataMaxAge` #94
- New instrumentation events #95

@@ -21,0 +31,0 @@ - Expose loggers #97 #102

{
"name": "kafkajs",
"version": "1.3.0",
"version": "1.3.1",
"description": "A modern Apache Kafka client for node.js",

@@ -28,6 +28,16 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

"test:debug": "NODE_ENV=test node --inspect-brk node_modules/.bin/jest --runInBand --watch",
"test": "NODE_ENV=test yarn lint && JEST_JUNIT_OUTPUT=test-report.xml ./scripts/testWithKafka.sh './node_modules/.bin/jest --maxWorkers=4 --no-watchman --forceExit'",
"test": "NODE_ENV=test yarn lint && JEST_JUNIT_OUTPUT=test-report.xml ./scripts/testWithKafka.sh './node_modules/.bin/jest --ci --maxWorkers=4 --no-watchman --forceExit'",
"lint": "find . -path ./node_modules -prune -o -path ./coverage -prune -o -name '*.js' -print0 | xargs -0 ./node_modules/.bin/eslint",
"format": "find . -path ./node_modules -prune -o -path ./coverage -prune -o -name '*.js' -print0 | xargs -0 ./node_modules/.bin/prettier --write",
"precommit": "lint-staged"
"precommit": "lint-staged",
"test:group:broker": "./node_modules/.bin/jest --forceExit --testPathPattern 'src/broker/.*'",
"test:group:admin": "NODE_ENV=test ./node_modules/.bin/jest --forceExit --testPathPattern 'src/admin/.*'",
"test:group:producer": "NODE_ENV=test ./node_modules/.bin/jest --forceExit --testPathPattern 'src/producer/.*'",
"test:group:consumer": "NODE_ENV=test ./node_modules/.bin/jest --forceExit --testPathPattern 'src/consumer/.*'",
"test:group:others": "NODE_ENV=test ./node_modules/.bin/jest --forceExit --testPathPattern 'src/(?!(broker|admin|producer|consumer)/).*'",
"test:group:broker:ci": "NODE_ENV=test JEST_JUNIT_OUTPUT=test-report.xml ./scripts/testWithKafka.sh \"yarn test:group:broker --ci --maxWorkers=4 --no-watchman\"",
"test:group:admin:ci": "NODE_ENV=test JEST_JUNIT_OUTPUT=test-report.xml ./scripts/testWithKafka.sh \"yarn test:group:admin --ci --maxWorkers=4 --no-watchman\"",
"test:group:producer:ci": "NODE_ENV=test JEST_JUNIT_OUTPUT=test-report.xml ./scripts/testWithKafka.sh \"yarn test:group:producer --ci --maxWorkers=4 --no-watchman\"",
"test:group:consumer:ci": "NODE_ENV=test JEST_JUNIT_OUTPUT=test-report.xml ./scripts/testWithKafka.sh \"yarn test:group:consumer --ci --maxWorkers=4 --no-watchman\"",
"test:group:others:ci": "NODE_ENV=test JEST_JUNIT_OUTPUT=test-report.xml ./scripts/testWithKafka.sh \"yarn test:group:others --ci --maxWorkers=4 --no-watchman\""
},

@@ -34,0 +44,0 @@ "devDependencies": {

48

README.md

@@ -42,3 +42,3 @@ [![KafkaJS](https://raw.githubusercontent.com/tulios/kafkajs/master/logo.png)](https://github.com/tulios/kafkajs)

- [Options](#consuming-messages-options)
- [Pause, Resume, & Seek](#consuming-messages-pause-resume)
- [Pause & Resume](#consuming-messages-pause-resume)
- [Seek](#consuming-messages-seek)

@@ -133,3 +133,3 @@ - [Custom partition assigner](#consuming-messages-custom-partition-assigner)

The `retry` option can be used to set the configuration of the retry mechanism, which is be used to retry connections and API calls to Kafka (when using producers or consumers).
The `retry` option can be used to set the configuration of the retry mechanism, which is used to retry connections and API calls to Kafka (when using producers or consumers).

@@ -722,3 +722,3 @@ The retry mechanism uses a randomization function that grows exponentially.

The admin client will host all the cluster operations, such as: `createTopics`, `createPartitions`, etc. Currently, only `createTopics` is available.
The admin client will host all the cluster operations, such as: `createTopics`, `createPartitions`, etc.

@@ -796,3 +796,3 @@ ```javascript

`setOffsets` allows you to set the consumer group offset to any value offset.
`setOffsets` allows you to set the consumer group offset to any value.

@@ -833,3 +833,3 @@ ```javascript

Some operations are instrumented using the `EventEmitter`. To receive the events use the method `consumer#on`, example:
Some operations are instrumented using the `EventEmitter`. To receive the events use the method `consumer#on`, `producer#on` and `admin#on`, example:

@@ -844,4 +844,17 @@ ```javascript

Instrumentation Event:
```javascript
{
id: <Number>,
type: <String>,
timestamp: <Number>,
payload: <Object>
}
```
List of available events:
### Consumer
* consumer.events.HEARTBEAT

@@ -865,13 +878,20 @@ payload: {`groupId`, `memberId`, `groupGenerationId`}

Instrumentation Event:
* consumer.events.CONNECT
```javascript
{
id: <Number>,
type: <String>,
timestamp: <Number>,
payload: <Object>
}
```
* consumer.events.DISCONNECT
* consumer.events.STOP
### Producer
* producer.events.CONNECT
* producer.events.DISCONNECT
### Admin
* admin.events.CONNECT
* admin.events.DISCONNECT
## <a name="custom-logging"></a> Custom logging

@@ -878,0 +898,0 @@

const createRetry = require('../retry')
const waitFor = require('../utils/waitFor')
const createConsumer = require('../consumer')
const InstrumentationEventEmitter = require('../instrumentation/emitter')
const events = require('./instrumentationEvents')
const { CONNECT, DISCONNECT } = require('./instrumentationEvents')
const { LEVELS } = require('../loggers')
const { KafkaJSNonRetriableError } = require('../errors')
const { values, keys } = Object
const eventNames = values(events)
const eventKeys = keys(events)
.map(key => `admin.events.${key}`)
.join(', ')
const retryOnLeaderNotAvailable = (fn, opts = {}) => {

@@ -35,2 +44,3 @@ const callback = async () => {

const logger = rootLogger.namespace('Admin')
const instrumentationEmitter = new InstrumentationEventEmitter()

@@ -40,3 +50,6 @@ /**

*/
const connect = async () => await cluster.connect()
const connect = async () => {
await cluster.connect()
instrumentationEmitter.emit(CONNECT)
}

@@ -46,3 +59,6 @@ /**

*/
const disconnect = async () => await cluster.disconnect()
const disconnect = async () => {
await cluster.disconnect()
instrumentationEmitter.emit(DISCONNECT)
}

@@ -222,2 +238,22 @@ /**

/**
* @param {string} eventName
* @param {Function} listener
* @return {Function}
*/
const on = (eventName, listener) => {
if (!eventNames.includes(eventName)) {
throw new KafkaJSNonRetriableError(`Event name should be one of ${eventKeys}`)
}
return instrumentationEmitter.addListener(eventName, event => {
Promise.resolve(listener(event)).catch(e => {
logger.error(`Failed to execute listener: ${e.message}`, {
eventName,
stack: e.stack,
})
})
})
}
/**
* @return {Object} logger

@@ -231,3 +267,5 @@ */

createTopics,
events,
fetchOffsets,
on,
setOffsets,

@@ -234,0 +272,0 @@ resetOffsets,

@@ -7,2 +7,3 @@ const Long = require('long')

const InstrumentationEventEmitter = require('../instrumentation/emitter')
const { CONNECT, DISCONNECT, STOP } = require('./instrumentationEvents')
const { KafkaJSNonRetriableError } = require('../errors')

@@ -91,3 +92,6 @@ const { roundRobin } = require('./assigners')

*/
const connect = async () => await cluster.connect()
const connect = async () => {
await cluster.connect()
instrumentationEmitter.emit(CONNECT)
}

@@ -102,2 +106,3 @@ /**

await cluster.disconnect()
instrumentationEmitter.emit(DISCONNECT)
} catch (e) {}

@@ -115,2 +120,3 @@ }

consumerGroup = null
instrumentationEmitter.emit(STOP)
}

@@ -117,0 +123,0 @@

@@ -11,2 +11,5 @@ const InstrumentationEventType = require('../instrumentation/eventType')

END_BATCH_PROCESS: consumerType('end_batch_process'),
CONNECT: consumerType('connect'),
DISCONNECT: consumerType('disconnect'),
STOP: consumerType('stop'),
}

@@ -8,4 +8,6 @@ const { createLogger, LEVELS: { INFO } } = require('./loggers')

const { assign } = Object
const privateCreateCluster = Symbol('private:Kafka:createCluster')
const PRIVATE = {
CREATE_CLUSTER: Symbol('private:Kafka:createCluster'),
LOGGER: Symbol('private:Kafka:logger'),
}

@@ -25,6 +27,6 @@ module.exports = class Client {

}) {
this.logger = createLogger({ level: logLevel, logCreator })
this[privateCreateCluster] = (metadataMaxAge = 300000) =>
this[PRIVATE.LOGGER] = createLogger({ level: logLevel, logCreator })
this[PRIVATE.CREATE_CLUSTER] = (metadataMaxAge = 300000) =>
new Cluster({
logger: this.logger,
logger: this[PRIVATE.LOGGER],
brokers,

@@ -46,6 +48,6 @@ ssl,

producer({ createPartitioner, retry, metadataMaxAge } = {}) {
const cluster = this[privateCreateCluster](metadataMaxAge)
const cluster = this[PRIVATE.CREATE_CLUSTER](metadataMaxAge)
return createProducer({
retry: assign({}, cluster.retry, retry),
logger: this.logger,
retry: { ...cluster.retry, ...retry },
logger: this[PRIVATE.LOGGER],
cluster,

@@ -73,6 +75,6 @@ createPartitioner,

) {
const cluster = this[privateCreateCluster](metadataMaxAge)
const cluster = this[PRIVATE.CREATE_CLUSTER](metadataMaxAge)
return createConsumer({
retry: assign({}, cluster.retry, retry),
logger: this.logger,
retry: { ...cluster.retry, retry },
logger: this[PRIVATE.LOGGER],
cluster,

@@ -94,6 +96,6 @@ groupId,

admin({ retry } = {}) {
const cluster = this[privateCreateCluster]()
const cluster = this[PRIVATE.CREATE_CLUSTER]()
return createAdmin({
retry: assign({}, cluster.retry, retry),
logger: this.logger,
retry: { ...cluster.retry, retry },
logger: this[PRIVATE.LOGGER],
cluster,

@@ -107,4 +109,4 @@ })

logger() {
return this.logger
return this[PRIVATE.LOGGER]
}
}

@@ -307,39 +307,41 @@ const createRetry = require('../retry')

// Not enough bytes to read the expected response size, keep buffering
if (Buffer.byteLength(this.buffer) <= Decoder.int32Size()) {
return
}
// Process data if there are enough bytes to read the expected response size,
// otherwise keep buffering
while (Buffer.byteLength(this.buffer) > Decoder.int32Size()) {
const data = Buffer.from(this.buffer)
const decoder = new Decoder(data)
const expectedResponseSize = decoder.readInt32()
const data = Buffer.from(this.buffer)
const decoder = new Decoder(data)
const expectedResponseSize = decoder.readInt32()
if (!decoder.canReadBytes(expectedResponseSize)) {
return
}
if (!decoder.canReadBytes(expectedResponseSize)) {
return
}
const response = new Decoder(decoder.readBytes(expectedResponseSize))
// Reset the buffer as the rest of the bytes
this.buffer = decoder.readAll()
// The full payload is loaded, erase the temporary buffer
this.buffer = Buffer.alloc(0)
if (this.authHandlers) {
const rawResponseSize = Decoder.int32Size() + expectedResponseSize
const rawResponseBuffer = data.slice(0, rawResponseSize)
return this.authHandlers.onSuccess(rawResponseBuffer)
}
if (this.authHandlers) {
return this.authHandlers.onSuccess(data)
}
const correlationId = response.readInt32()
const payload = response.readAll()
const correlationId = decoder.readInt32()
const payload = decoder.readAll()
const entry = this.pendingQueue[correlationId]
delete this.pendingQueue[correlationId]
const entry = this.pendingQueue[correlationId]
delete this.pendingQueue[correlationId]
if (!entry) {
this.logDebug(`Response without match`, { correlationId })
return
}
if (!entry) {
this.logDebug(`Response without match`, { correlationId })
return
entry.resolve({
size: expectedResponseSize,
correlationId,
entry,
payload,
})
}
entry.resolve({
size: expectedResponseSize,
correlationId,
entry,
payload,
})
}

@@ -346,0 +348,0 @@

const createRetry = require('../retry')
const createDefaultPartitioner = require('./partitioners/default')
const createSendMessages = require('./sendMessages')
const InstrumentationEventEmitter = require('../instrumentation/emitter')
const events = require('./instrumentationEvents')
const { CONNECT, DISCONNECT } = require('./instrumentationEvents')
const { KafkaJSNonRetriableError } = require('../errors')
const { values, keys } = Object
const eventNames = values(events)
const eventKeys = keys(events)
.map(key => `producer.events.${key}`)
.join(', ')
module.exports = ({

@@ -14,2 +23,3 @@ cluster,

const retrier = createRetry(Object.assign({}, cluster.retry, retry))
const instrumentationEmitter = new InstrumentationEventEmitter()
const logger = rootLogger.namespace('Producer')

@@ -112,2 +122,22 @@ const sendMessages = createSendMessages({ logger, cluster, partitioner })

/**
* @param {string} eventName
* @param {Function} listener
* @return {Function}
*/
const on = (eventName, listener) => {
if (!eventNames.includes(eventName)) {
throw new KafkaJSNonRetriableError(`Event name should be one of ${eventKeys}`)
}
return instrumentationEmitter.addListener(eventName, event => {
Promise.resolve(listener(event)).catch(e => {
logger.error(`Failed to execute listener: ${e.message}`, {
eventName,
stack: e.stack,
})
})
})
}
/**
* @returns {Object} logger

@@ -121,3 +151,6 @@ */

*/
connect: async () => await cluster.connect(),
connect: async () => {
await cluster.connect()
instrumentationEmitter.emit(CONNECT)
},

@@ -127,4 +160,11 @@ /**

*/
disconnect: async () => await cluster.disconnect(),
disconnect: async () => {
await cluster.disconnect()
instrumentationEmitter.emit(DISCONNECT)
},
events,
on,
send,

@@ -131,0 +171,0 @@

@@ -93,5 +93,3 @@ const Long = require('long')

readBytes() {
const byteLength = this.readInt32()
readBytes(byteLength = this.readInt32()) {
if (byteLength === -1) {

@@ -98,0 +96,0 @@ return null

@@ -29,3 +29,9 @@ const Decoder = require('../../../decoder')

const decodeMessages = async decoder => {
const messagesBuffer = decoder.readBytes()
const messagesSize = decoder.readInt32()
if (messagesSize <= 0 || !decoder.canReadBytes(messagesSize)) {
return []
}
const messagesBuffer = decoder.readBytes(messagesSize)
const magicByte = messagesBuffer.slice(MAGIC_OFFSET).readInt8()

@@ -32,0 +38,0 @@

@@ -26,3 +26,3 @@ const Decoder = require('../../../decoder')

const decoder = new Decoder(rawData)
const responses = decoder.readArray(decoder => ({
const topics = decoder.readArray(decoder => ({
topicName: decoder.readString(),

@@ -35,3 +35,3 @@ partitions: decoder.readArray(partition),

return {
responses,
topics,
throttleTime,

@@ -42,3 +42,3 @@ }

const parse = async data => {
const partitionsWithError = data.responses.map(response => {
const partitionsWithError = data.topics.map(response => {
return response.partitions.filter(partition => failure(partition.errorCode))

@@ -45,0 +45,0 @@ })

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc