Comparing version
{ | ||
"name": "kafkajs", | ||
"version": "1.13.0-beta.41", | ||
"version": "1.13.0-beta.42", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -82,5 +82,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"kafkajs": { | ||
"sha": "fce9c09a00e0d7969f7a79adff73257a557d2af3", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...fce9c09a00e0d7969f7a79adff73257a557d2af3" | ||
"sha": "c8eb41c47a898899ee511e854df1d478079b8595", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...c8eb41c47a898899ee511e854df1d478079b8595" | ||
} | ||
} |
@@ -318,2 +318,4 @@ const createRetry = require('../retry') | ||
const payloadDecoded = await response.decode(payload) | ||
// KIP-219: If the response indicates that the client-side needs to throttle, do that. | ||
this.requestQueue.maybeThrottle(payloadDecoded.clientSideThrottleTime) | ||
const data = await response.parse(payloadDecoded) | ||
@@ -320,0 +322,0 @@ const isFetchApi = entry.apiName === 'Fetch' |
@@ -39,2 +39,19 @@ const SocketRequest = require('./socketRequest') | ||
/** | ||
* Until when this request queue is throttled and shouldn't send requests | ||
* | ||
* The value represents the timestamp of the end of the throttling in ms-since-epoch. If the value | ||
* is smaller than the current timestamp no throttling is active. | ||
* | ||
* @type {number} | ||
*/ | ||
this.throttledUntil = -1 | ||
/** | ||
* Timeout id if we have scheduled a check for pending requests due to client-side throttling | ||
* | ||
* @type {null|NodeJS.Timeout} | ||
*/ | ||
this.throttleCheckTimeoutId = null | ||
this[PRIVATE.EMIT_QUEUE_SIZE_EVENT] = () => { | ||
@@ -71,2 +88,9 @@ instrumentationEmitter && | ||
maybeThrottle(clientSideThrottleTime) { | ||
if (clientSideThrottleTime) { | ||
const minimumThrottledUntil = Date.now() + clientSideThrottleTime | ||
this.throttledUntil = Math.max(minimumThrottledUntil, this.throttledUntil) | ||
} | ||
} | ||
/** | ||
@@ -104,26 +128,21 @@ * @typedef {Object} PushedRequest | ||
this.inflight.delete(correlationId) | ||
const pendingRequest = this.pending.pop() | ||
pendingRequest && pendingRequest.send() | ||
this.checkPendingRequests() | ||
}, | ||
}) | ||
// TODO: Remove the "null" check once this is validated in production and | ||
// can receive a default value | ||
const shouldEnqueue = | ||
this.maxInFlightRequests != null && this.inflight.size >= this.maxInFlightRequests | ||
if (this.canSendSocketRequestImmediately()) { | ||
this.sendSocketRequest(socketRequest) | ||
return | ||
} | ||
if (shouldEnqueue) { | ||
this.pending.push(socketRequest) | ||
this.pending.push(socketRequest) | ||
this.scheduleCheckPendingRequests() | ||
this.logger.debug(`Request enqueued`, { | ||
clientId: this.clientId, | ||
broker: this.broker, | ||
correlationId, | ||
}) | ||
this.logger.debug(`Request enqueued`, { | ||
clientId: this.clientId, | ||
broker: this.broker, | ||
correlationId, | ||
}) | ||
this[PRIVATE.EMIT_QUEUE_SIZE_EVENT]() | ||
return | ||
} | ||
this.sendSocketRequest(socketRequest) | ||
this[PRIVATE.EMIT_QUEUE_SIZE_EVENT]() | ||
} | ||
@@ -157,20 +176,6 @@ | ||
const socketRequest = this.inflight.get(correlationId) | ||
this.inflight.delete(correlationId) | ||
if (this.pending.length > 0) { | ||
const pendingRequest = this.pending.pop() | ||
this.sendSocketRequest(pendingRequest) | ||
this.checkPendingRequests() | ||
this.logger.debug(`Consumed pending request`, { | ||
clientId: this.clientId, | ||
broker: this.broker, | ||
correlationId: pendingRequest.correlationId, | ||
pendingDuration: pendingRequest.pendingDuration, | ||
currentPendingQueueSize: this.pending.length, | ||
}) | ||
this[PRIVATE.EMIT_QUEUE_SIZE_EVENT]() | ||
} | ||
this.inflight.delete(correlationId) | ||
if (!socketRequest) { | ||
@@ -211,3 +216,59 @@ this.logger.warn(`Response without match`, { | ||
clearInterval(this.requestTimeoutIntervalId) | ||
clearTimeout(this.throttleCheckTimeoutId) | ||
this.throttleCheckTimeoutId = null | ||
} | ||
canSendSocketRequestImmediately() { | ||
const shouldEnqueue = | ||
(this.maxInFlightRequests != null && this.inflight.size >= this.maxInFlightRequests) || | ||
this.throttledUntil > Date.now() | ||
return !shouldEnqueue | ||
} | ||
/** | ||
* Check and process pending requests either now or in the future | ||
* | ||
* This function will send out as many pending requests as possible taking throttling and | ||
* in-flight limits into account. | ||
*/ | ||
checkPendingRequests() { | ||
while (this.pending.length > 0 && this.canSendSocketRequestImmediately()) { | ||
const pendingRequest = this.pending.pop() | ||
this.sendSocketRequest(pendingRequest) | ||
this.logger.debug(`Consumed pending request`, { | ||
clientId: this.clientId, | ||
broker: this.broker, | ||
correlationId: pendingRequest.correlationId, | ||
pendingDuration: pendingRequest.pendingDuration, | ||
currentPendingQueueSize: this.pending.length, | ||
}) | ||
this[PRIVATE.EMIT_QUEUE_SIZE_EVENT]() | ||
} | ||
this.scheduleCheckPendingRequests() | ||
} | ||
/** | ||
* Ensure that pending requests will be checked in the future | ||
* | ||
* If there is a client-side throttling in place this will ensure that we will check | ||
* the pending request queue eventually. | ||
*/ | ||
scheduleCheckPendingRequests() { | ||
// If we're throttled: Schedule checkPendingRequests when the throttle | ||
// should be resolved. If there is already something scheduled we assume that that | ||
// will be fine, and potentially fix up a new timeout if needed at that time. | ||
// Note that if we're merely "overloaded" by having too many inflight requests | ||
// we will anyways check the queue when one of them gets fulfilled. | ||
const timeUntilUnthrottled = this.throttledUntil - Date.now() | ||
if (timeUntilUnthrottled > 0 && !this.throttleCheckTimeoutId) { | ||
this.throttleCheckTimeoutId = setTimeout(() => { | ||
this.throttleCheckTimeoutId = null | ||
this.checkPendingRequests() | ||
}, timeUntilUnthrottled) | ||
} | ||
} | ||
} |
@@ -128,2 +128,31 @@ const ISOLATION_LEVEL = require('../../isolationLevel') | ||
}, | ||
8: ({ | ||
replicaId = REPLICA_ID, | ||
isolationLevel = ISOLATION_LEVEL.READ_COMMITTED, | ||
sessionId = 0, | ||
sessionEpoch = -1, | ||
forgottenTopics = [], | ||
maxWaitTime, | ||
minBytes, | ||
maxBytes, | ||
topics, | ||
}) => { | ||
const request = require('./v8/request') | ||
const response = require('./v8/response') | ||
return { | ||
request: request({ | ||
replicaId, | ||
isolationLevel, | ||
sessionId, | ||
sessionEpoch, | ||
forgottenTopics, | ||
maxWaitTime, | ||
minBytes, | ||
maxBytes, | ||
topics, | ||
}), | ||
response, | ||
requestTimeout: requestTimeout(maxWaitTime), | ||
} | ||
}, | ||
} | ||
@@ -130,0 +159,0 @@ |
523016
1.28%291
0.69%15489
1.29%