New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version

to
1.13.0-beta.42

src/protocol/requests/fetch/v8/request.js

6

package.json
{
"name": "kafkajs",
"version": "1.13.0-beta.41",
"version": "1.13.0-beta.42",
"description": "A modern Apache Kafka client for node.js",

@@ -82,5 +82,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

"kafkajs": {
"sha": "fce9c09a00e0d7969f7a79adff73257a557d2af3",
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...fce9c09a00e0d7969f7a79adff73257a557d2af3"
"sha": "c8eb41c47a898899ee511e854df1d478079b8595",
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...c8eb41c47a898899ee511e854df1d478079b8595"
}
}

@@ -318,2 +318,4 @@ const createRetry = require('../retry')

const payloadDecoded = await response.decode(payload)
// KIP-219: If the response indicates that the client-side needs to throttle, do that.
this.requestQueue.maybeThrottle(payloadDecoded.clientSideThrottleTime)
const data = await response.parse(payloadDecoded)

@@ -320,0 +322,0 @@ const isFetchApi = entry.apiName === 'Fetch'

@@ -39,2 +39,19 @@ const SocketRequest = require('./socketRequest')

/**
* Until when this request queue is throttled and shouldn't send requests
*
* The value represents the timestamp of the end of the throttling in ms-since-epoch. If the value
* is smaller than the current timestamp no throttling is active.
*
* @type {number}
*/
this.throttledUntil = -1
/**
* Timeout id if we have scheduled a check for pending requests due to client-side throttling
*
* @type {null|NodeJS.Timeout}
*/
this.throttleCheckTimeoutId = null
this[PRIVATE.EMIT_QUEUE_SIZE_EVENT] = () => {

@@ -71,2 +88,9 @@ instrumentationEmitter &&

maybeThrottle(clientSideThrottleTime) {
if (clientSideThrottleTime) {
const minimumThrottledUntil = Date.now() + clientSideThrottleTime
this.throttledUntil = Math.max(minimumThrottledUntil, this.throttledUntil)
}
}
/**

@@ -104,26 +128,21 @@ * @typedef {Object} PushedRequest

this.inflight.delete(correlationId)
const pendingRequest = this.pending.pop()
pendingRequest && pendingRequest.send()
this.checkPendingRequests()
},
})
// TODO: Remove the "null" check once this is validated in production and
// can receive a default value
const shouldEnqueue =
this.maxInFlightRequests != null && this.inflight.size >= this.maxInFlightRequests
if (this.canSendSocketRequestImmediately()) {
this.sendSocketRequest(socketRequest)
return
}
if (shouldEnqueue) {
this.pending.push(socketRequest)
this.pending.push(socketRequest)
this.scheduleCheckPendingRequests()
this.logger.debug(`Request enqueued`, {
clientId: this.clientId,
broker: this.broker,
correlationId,
})
this.logger.debug(`Request enqueued`, {
clientId: this.clientId,
broker: this.broker,
correlationId,
})
this[PRIVATE.EMIT_QUEUE_SIZE_EVENT]()
return
}
this.sendSocketRequest(socketRequest)
this[PRIVATE.EMIT_QUEUE_SIZE_EVENT]()
}

@@ -157,20 +176,6 @@

const socketRequest = this.inflight.get(correlationId)
this.inflight.delete(correlationId)
if (this.pending.length > 0) {
const pendingRequest = this.pending.pop()
this.sendSocketRequest(pendingRequest)
this.checkPendingRequests()
this.logger.debug(`Consumed pending request`, {
clientId: this.clientId,
broker: this.broker,
correlationId: pendingRequest.correlationId,
pendingDuration: pendingRequest.pendingDuration,
currentPendingQueueSize: this.pending.length,
})
this[PRIVATE.EMIT_QUEUE_SIZE_EVENT]()
}
this.inflight.delete(correlationId)
if (!socketRequest) {

@@ -211,3 +216,59 @@ this.logger.warn(`Response without match`, {

clearInterval(this.requestTimeoutIntervalId)
clearTimeout(this.throttleCheckTimeoutId)
this.throttleCheckTimeoutId = null
}
canSendSocketRequestImmediately() {
const shouldEnqueue =
(this.maxInFlightRequests != null && this.inflight.size >= this.maxInFlightRequests) ||
this.throttledUntil > Date.now()
return !shouldEnqueue
}
/**
* Check and process pending requests either now or in the future
*
* This function will send out as many pending requests as possible taking throttling and
* in-flight limits into account.
*/
checkPendingRequests() {
while (this.pending.length > 0 && this.canSendSocketRequestImmediately()) {
const pendingRequest = this.pending.pop()
this.sendSocketRequest(pendingRequest)
this.logger.debug(`Consumed pending request`, {
clientId: this.clientId,
broker: this.broker,
correlationId: pendingRequest.correlationId,
pendingDuration: pendingRequest.pendingDuration,
currentPendingQueueSize: this.pending.length,
})
this[PRIVATE.EMIT_QUEUE_SIZE_EVENT]()
}
this.scheduleCheckPendingRequests()
}
/**
* Ensure that pending requests will be checked in the future
*
* If there is a client-side throttling in place this will ensure that we will check
* the pending request queue eventually.
*/
scheduleCheckPendingRequests() {
// If we're throttled: Schedule checkPendingRequests when the throttle
// should be resolved. If there is already something scheduled we assume that that
// will be fine, and potentially fix up a new timeout if needed at that time.
// Note that if we're merely "overloaded" by having too many inflight requests
// we will anyways check the queue when one of them gets fulfilled.
const timeUntilUnthrottled = this.throttledUntil - Date.now()
if (timeUntilUnthrottled > 0 && !this.throttleCheckTimeoutId) {
this.throttleCheckTimeoutId = setTimeout(() => {
this.throttleCheckTimeoutId = null
this.checkPendingRequests()
}, timeUntilUnthrottled)
}
}
}

@@ -128,2 +128,31 @@ const ISOLATION_LEVEL = require('../../isolationLevel')

},
8: ({
replicaId = REPLICA_ID,
isolationLevel = ISOLATION_LEVEL.READ_COMMITTED,
sessionId = 0,
sessionEpoch = -1,
forgottenTopics = [],
maxWaitTime,
minBytes,
maxBytes,
topics,
}) => {
const request = require('./v8/request')
const response = require('./v8/response')
return {
request: request({
replicaId,
isolationLevel,
sessionId,
sessionEpoch,
forgottenTopics,
maxWaitTime,
minBytes,
maxBytes,
topics,
}),
response,
requestTimeout: requestTimeout(maxWaitTime),
}
},
}

@@ -130,0 +159,0 @@