Comparing version
{ | ||
"name": "kafkajs", | ||
"version": "1.13.0-beta.5", | ||
"version": "1.13.0-beta.6", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -82,5 +82,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"kafkajs": { | ||
"sha": "b5ffa139bd193706f363b49de25c288e2b93d71e", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...b5ffa139bd193706f363b49de25c288e2b93d71e" | ||
"sha": "7c454fd713ff85c8cc87e90709f6893ce36a284b", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v1.12.0...7c454fd713ff85c8cc87e90709f6893ce36a284b" | ||
} | ||
} |
@@ -78,2 +78,3 @@ const createRetry = require('../retry') | ||
logger: logger.namespace('RequestQueue'), | ||
isConnected: () => this.connected, | ||
}) | ||
@@ -113,2 +114,3 @@ | ||
this.connected = true | ||
this.requestQueue.scheduleRequestTimeoutCheck() | ||
resolve(true) | ||
@@ -204,2 +206,3 @@ } | ||
this.logDebug('disconnecting...') | ||
this.requestQueue.destroy() | ||
this.connected = false | ||
@@ -206,0 +209,0 @@ this.socket.end() |
@@ -25,2 +25,3 @@ const SocketRequest = require('./socketRequest') | ||
logger, | ||
isConnected = () => true, | ||
}) { | ||
@@ -34,2 +35,3 @@ this.instrumentationEmitter = instrumentationEmitter | ||
this.logger = logger | ||
this.isConnected = isConnected | ||
@@ -50,2 +52,23 @@ this.inflight = new Map() | ||
/** | ||
* @public | ||
*/ | ||
scheduleRequestTimeoutCheck() { | ||
if (this.enforceRequestTimeout) { | ||
this.destroy() | ||
this.requestTimeoutIntervalId = setInterval(() => { | ||
this.inflight.forEach(request => { | ||
if (Date.now() - request.sentAt > request.requestTimeout) { | ||
request.timeoutRequest() | ||
} | ||
if (!this.isConnected()) { | ||
this.destroy() | ||
} | ||
}) | ||
}, Math.min(this.requestTimeout, 100)) | ||
} | ||
} | ||
/** | ||
* @typedef {Object} PushedRequest | ||
@@ -75,3 +98,2 @@ * @property {RequestEntry} entry | ||
instrumentationEmitter: this.instrumentationEmitter, | ||
enforceRequestTimeout: this.enforceRequestTimeout, | ||
requestTimeout, | ||
@@ -176,2 +198,9 @@ send: () => { | ||
} | ||
/** | ||
* @public | ||
*/ | ||
destroy() { | ||
clearInterval(this.requestTimeoutIntervalId) | ||
} | ||
} |
@@ -52,3 +52,2 @@ const { KafkaJSRequestTimeoutError, KafkaJSNonRetriableError } = require('../../errors') | ||
requestTimeout, | ||
enforceRequestTimeout, | ||
broker, | ||
@@ -64,3 +63,2 @@ clientId, | ||
this.requestTimeout = requestTimeout | ||
this.enforceRequestTimeout = enforceRequestTimeout | ||
this.broker = broker | ||
@@ -77,3 +75,2 @@ this.clientId = clientId | ||
this.pendingDuration = null | ||
this.timeoutId = null | ||
@@ -95,28 +92,24 @@ this[PRIVATE.STATE] = REQUEST_STATE.PENDING | ||
this[PRIVATE.STATE] = REQUEST_STATE.SENT | ||
} | ||
const timeoutCallback = () => { | ||
const { apiName, apiKey, apiVersion } = this.entry | ||
const requestInfo = `${apiName}(key: ${apiKey}, version: ${apiVersion})` | ||
const eventData = { | ||
broker: this.broker, | ||
clientId: this.clientId, | ||
correlationId: this.correlationId, | ||
createdAt: this.createdAt, | ||
sentAt: this.sentAt, | ||
pendingDuration: this.pendingDuration, | ||
} | ||
this.timeoutHandler() | ||
this.rejected(new KafkaJSRequestTimeoutError(`Request ${requestInfo} timed out`, eventData)) | ||
this[PRIVATE.EMIT_EVENT](events.NETWORK_REQUEST_TIMEOUT, { | ||
...eventData, | ||
apiName, | ||
apiKey, | ||
apiVersion, | ||
}) | ||
timeoutRequest() { | ||
const { apiName, apiKey, apiVersion } = this.entry | ||
const requestInfo = `${apiName}(key: ${apiKey}, version: ${apiVersion})` | ||
const eventData = { | ||
broker: this.broker, | ||
clientId: this.clientId, | ||
correlationId: this.correlationId, | ||
createdAt: this.createdAt, | ||
sentAt: this.sentAt, | ||
pendingDuration: this.pendingDuration, | ||
} | ||
if (this.enforceRequestTimeout) { | ||
this.timeoutId = setTimeout(timeoutCallback, this.requestTimeout) | ||
} | ||
this.timeoutHandler() | ||
this.rejected(new KafkaJSRequestTimeoutError(`Request ${requestInfo} timed out`, eventData)) | ||
this[PRIVATE.EMIT_EVENT](events.NETWORK_REQUEST_TIMEOUT, { | ||
...eventData, | ||
apiName, | ||
apiKey, | ||
apiVersion, | ||
}) | ||
} | ||
@@ -130,3 +123,2 @@ | ||
clearTimeout(this.timeoutId) | ||
const { entry, correlationId, broker, clientId, createdAt, sentAt, pendingDuration } = this | ||
@@ -159,3 +151,2 @@ | ||
clearTimeout(this.timeoutId) | ||
this[PRIVATE.STATE] = REQUEST_STATE.REJECTED | ||
@@ -162,0 +153,0 @@ this.duration = Date.now() - this.sentAt |
499119
0.07%14765
0.14%