New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version

to
1.13.0-beta.6

6

package.json
{
"name": "kafkajs",
"version": "1.13.0-beta.5",
"version": "1.13.0-beta.6",
"description": "A modern Apache Kafka client for node.js",

@@ -82,5 +82,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

"kafkajs": {
"sha": "b5ffa139bd193706f363b49de25c288e2b93d71e",
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...b5ffa139bd193706f363b49de25c288e2b93d71e"
"sha": "7c454fd713ff85c8cc87e90709f6893ce36a284b",
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...7c454fd713ff85c8cc87e90709f6893ce36a284b"
}
}

@@ -78,2 +78,3 @@ const createRetry = require('../retry')

logger: logger.namespace('RequestQueue'),
isConnected: () => this.connected,
})

@@ -113,2 +114,3 @@

this.connected = true
this.requestQueue.scheduleRequestTimeoutCheck()
resolve(true)

@@ -204,2 +206,3 @@ }

this.logDebug('disconnecting...')
this.requestQueue.destroy()
this.connected = false

@@ -206,0 +209,0 @@ this.socket.end()

@@ -25,2 +25,3 @@ const SocketRequest = require('./socketRequest')

logger,
isConnected = () => true,
}) {

@@ -34,2 +35,3 @@ this.instrumentationEmitter = instrumentationEmitter

this.logger = logger
this.isConnected = isConnected

@@ -50,2 +52,23 @@ this.inflight = new Map()

/**
* @public
*/
scheduleRequestTimeoutCheck() {
if (this.enforceRequestTimeout) {
this.destroy()
this.requestTimeoutIntervalId = setInterval(() => {
this.inflight.forEach(request => {
if (Date.now() - request.sentAt > request.requestTimeout) {
request.timeoutRequest()
}
if (!this.isConnected()) {
this.destroy()
}
})
}, Math.min(this.requestTimeout, 100))
}
}
/**
* @typedef {Object} PushedRequest

@@ -75,3 +98,2 @@ * @property {RequestEntry} entry

instrumentationEmitter: this.instrumentationEmitter,
enforceRequestTimeout: this.enforceRequestTimeout,
requestTimeout,

@@ -176,2 +198,9 @@ send: () => {

}
/**
* @public
*/
destroy() {
clearInterval(this.requestTimeoutIntervalId)
}
}

@@ -52,3 +52,2 @@ const { KafkaJSRequestTimeoutError, KafkaJSNonRetriableError } = require('../../errors')

requestTimeout,
enforceRequestTimeout,
broker,

@@ -64,3 +63,2 @@ clientId,

this.requestTimeout = requestTimeout
this.enforceRequestTimeout = enforceRequestTimeout
this.broker = broker

@@ -77,3 +75,2 @@ this.clientId = clientId

this.pendingDuration = null
this.timeoutId = null

@@ -95,28 +92,24 @@ this[PRIVATE.STATE] = REQUEST_STATE.PENDING

this[PRIVATE.STATE] = REQUEST_STATE.SENT
}
const timeoutCallback = () => {
const { apiName, apiKey, apiVersion } = this.entry
const requestInfo = `${apiName}(key: ${apiKey}, version: ${apiVersion})`
const eventData = {
broker: this.broker,
clientId: this.clientId,
correlationId: this.correlationId,
createdAt: this.createdAt,
sentAt: this.sentAt,
pendingDuration: this.pendingDuration,
}
this.timeoutHandler()
this.rejected(new KafkaJSRequestTimeoutError(`Request ${requestInfo} timed out`, eventData))
this[PRIVATE.EMIT_EVENT](events.NETWORK_REQUEST_TIMEOUT, {
...eventData,
apiName,
apiKey,
apiVersion,
})
timeoutRequest() {
const { apiName, apiKey, apiVersion } = this.entry
const requestInfo = `${apiName}(key: ${apiKey}, version: ${apiVersion})`
const eventData = {
broker: this.broker,
clientId: this.clientId,
correlationId: this.correlationId,
createdAt: this.createdAt,
sentAt: this.sentAt,
pendingDuration: this.pendingDuration,
}
if (this.enforceRequestTimeout) {
this.timeoutId = setTimeout(timeoutCallback, this.requestTimeout)
}
this.timeoutHandler()
this.rejected(new KafkaJSRequestTimeoutError(`Request ${requestInfo} timed out`, eventData))
this[PRIVATE.EMIT_EVENT](events.NETWORK_REQUEST_TIMEOUT, {
...eventData,
apiName,
apiKey,
apiVersion,
})
}

@@ -130,3 +123,2 @@

clearTimeout(this.timeoutId)
const { entry, correlationId, broker, clientId, createdAt, sentAt, pendingDuration } = this

@@ -159,3 +151,2 @@

clearTimeout(this.timeoutId)
this[PRIVATE.STATE] = REQUEST_STATE.REJECTED

@@ -162,0 +153,0 @@ this.duration = Date.now() - this.sentAt