New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version 0.4.1 to 0.5.0

src/consumer/instrumentationEvents.js

2

package.json
{
"name": "kafkajs",
"version": "0.4.1",
"version": "0.5.0",
"description": "A modern Apache Kafka client for node.js",

@@ -5,0 +5,0 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

@@ -5,2 +5,3 @@ const flatten = require('../utils/flatten')

const { KafkaJSError } = require('../errors')
const { HEARTBEAT } = require('./instrumentationEvents')

@@ -22,2 +23,3 @@ const { keys } = Object

logger,
instrumentationEmitter,
assigner,

@@ -35,2 +37,3 @@ sessionTimeout,

this.logger = logger.namespace('ConsumerGroup')
this.instrumentationEmitter = instrumentationEmitter
this.assigner = assigner

@@ -109,2 +112,3 @@ this.sessionTimeout = sessionTimeout

topicConfigurations: this.topicConfigurations,
instrumentationEmitter: this.instrumentationEmitter,
coordinator,

@@ -134,8 +138,10 @@ memberAssignment,

if (now > this.lastRequest + interval) {
await this.coordinator.heartbeat({
const payload = {
groupId,
memberId,
groupGenerationId: generationId,
})
}
await this.coordinator.heartbeat(payload)
this.instrumentationEmitter.emit(HEARTBEAT, payload)
this.lastRequest = Date.now()

@@ -142,0 +148,0 @@ }

const createRoundRobinAssigned = require('./assigners/roundRobinAssigner')
const ConsumerGroup = require('./consumerGroup')
const Runner = require('./runner')
const events = require('./instrumentationEvents')
const InstrumentationEventEmitter = require('../instrumentation/emitter')
const { KafkaJSError } = require('../errors')
const eventNames = Object.values(events)
const eventKeys = Object.keys(events)
.map(key => `consumer.events.${key}`)
.join(', ')
module.exports = ({

@@ -20,2 +28,3 @@ cluster,

}) => {
const instrumentationEmitter = new InstrumentationEventEmitter()
const assigner = createPartitionAssigner({ cluster })

@@ -39,2 +48,3 @@ const logger = rootLogger.namespace('Consumer')

maxWaitTimeInMs,
instrumentationEmitter,
})

@@ -107,2 +117,17 @@

/**
* @param {string} eventName
* @param {Function} listener
* @return {Function}
*/
const on = (eventName, listener) => {
if (!eventNames.includes(eventName)) {
throw new KafkaJSError(`Event name should be one of ${eventKeys}`, {
retriable: false,
})
}
return instrumentationEmitter.addListener(eventName, event => listener(event))
}
return {

@@ -113,3 +138,5 @@ connect,

run,
on,
events,
}
}
const Long = require('long')
const isInvalidOffset = require('./isInvalidOffset')
const initializeConsumerOffsets = require('./initializeConsumerOffsets')
const { COMMIT_OFFSETS } = require('../instrumentationEvents')

@@ -14,2 +15,3 @@ const { keys, assign } = Object

topicConfigurations,
instrumentationEmitter,
groupId,

@@ -23,2 +25,3 @@ generationId,

this.topicConfigurations = topicConfigurations
this.instrumentationEmitter = instrumentationEmitter
this.groupId = groupId

@@ -138,3 +141,3 @@ this.generationId = generationId

await this.coordinator.offsetCommit({
const payload = {
groupId,

@@ -144,4 +147,7 @@ memberId,

topics: topicsWithPartitionsToCommit,
})
}
await this.coordinator.offsetCommit(payload)
this.instrumentationEmitter.emit(COMMIT_OFFSETS, payload)
// Update local reference of committed offsets

@@ -148,0 +154,0 @@ topicsWithPartitionsToCommit.forEach(({ topic, partitions }) => {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc