Comparing version 2.3.0-beta.2 to 2.3.0-beta.3
@@ -8,3 +8,7 @@ # Changelog | ||
## [2.2.3] - 2022-11-21 | ||
### Fixed | ||
- Fix regression in SASL/PLAIN authentication #1480 | ||
## [2.2.2] - 2022-10-18 | ||
@@ -11,0 +15,0 @@ |
{ | ||
"name": "kafkajs", | ||
"version": "2.3.0-beta.2", | ||
"version": "2.3.0-beta.3", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -85,5 +85,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"kafkajs": { | ||
"sha": "57d1e09e2737eb5e276f7219de07014b06f23d0b", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v2.2.2...57d1e09e2737eb5e276f7219de07014b06f23d0b" | ||
"sha": "c7135f76ac57a02ac09daa80e251e1d0dedaeb91", | ||
"compare": "https://github.com/tulios/kafkajs/compare/v2.2.3...c7135f76ac57a02ac09daa80e251e1d0dedaeb91" | ||
} | ||
} |
@@ -80,4 +80,4 @@ const Lock = require('../utils/lock') | ||
async connect() { | ||
await this.lock.acquire() | ||
try { | ||
await this.lock.acquire() | ||
if (this.isConnected()) { | ||
@@ -84,0 +84,0 @@ return |
@@ -397,2 +397,3 @@ const BrokerPool = require('./brokerPool') | ||
nodeId, | ||
groupId, | ||
error: e, | ||
@@ -399,0 +400,0 @@ }) |
@@ -5,3 +5,3 @@ const seq = require('../utils/seq') | ||
const createWorkerQueue = require('./workerQueue') | ||
const { KafkaJSFetcherRebalanceError } = require('../errors') | ||
const { KafkaJSFetcherRebalanceError, KafkaJSNoBrokerAvailableError } = require('../errors') | ||
@@ -38,2 +38,6 @@ /** @typedef {ReturnType<typeof createFetchManager>} FetchManager */ | ||
if (nodeIds.length === 0) { | ||
throw new KafkaJSNoBrokerAvailableError() | ||
} | ||
const validateShouldRebalance = () => { | ||
@@ -40,0 +44,0 @@ const current = getNodeIds() |
@@ -147,3 +147,3 @@ const { EventEmitter } = require('events') | ||
if (e.name === 'KafkaJSConnectionError') { | ||
if (e.name === 'KafkaJSNoBrokerAvailableError') { | ||
return bail(e) | ||
@@ -150,0 +150,0 @@ } |
@@ -261,4 +261,13 @@ const pkgJson = require('../package.json') | ||
class KafkaJSNoBrokerAvailableError extends KafkaJSError { | ||
constructor() { | ||
super('No broker available') | ||
this.name = 'KafkaJSNoBrokerAvailableError' | ||
} | ||
} | ||
const isRebalancing = e => | ||
e.type === 'REBALANCE_IN_PROGRESS' || e.type === 'NOT_COORDINATOR_FOR_GROUP' | ||
e.type === 'REBALANCE_IN_PROGRESS' || | ||
e.type === 'NOT_COORDINATOR_FOR_GROUP' || | ||
e.type === 'ILLEGAL_GENERATION' | ||
@@ -297,2 +306,3 @@ const isKafkaJSError = e => e instanceof KafkaJSError | ||
KafkaJSFetcherRebalanceError, | ||
KafkaJSNoBrokerAvailableError, | ||
KafkaJSAlterPartitionReassignmentsError, | ||
@@ -299,0 +309,0 @@ isRebalancing, |
@@ -12,2 +12,3 @@ const { EventEmitter } = require('events') | ||
const REQUEST_QUEUE_EMPTY = 'requestQueueEmpty' | ||
const CHECK_PENDING_REQUESTS_INTERVAL = 10 | ||
@@ -106,3 +107,4 @@ module.exports = class RequestQueue extends EventEmitter { | ||
maybeThrottle(clientSideThrottleTime) { | ||
if (clientSideThrottleTime) { | ||
if (clientSideThrottleTime !== null && clientSideThrottleTime > 0) { | ||
this.logger.debug(`Client side throttling in effect for ${clientSideThrottleTime}ms`) | ||
const minimumThrottledUntil = Date.now() + clientSideThrottleTime | ||
@@ -113,13 +115,3 @@ this.throttledUntil = Math.max(minimumThrottledUntil, this.throttledUntil) | ||
/** | ||
* @typedef {Object} PushedRequest | ||
* @property {import("./socketRequest").RequestEntry} entry | ||
* @property {boolean} expectResponse | ||
* @property {Function} sendRequest | ||
* @property {number} [requestTimeout] | ||
* | ||
* @public | ||
* @param {PushedRequest} pushedRequest | ||
*/ | ||
push(pushedRequest) { | ||
createSocketRequest(pushedRequest) { | ||
const { correlationId } = pushedRequest.entry | ||
@@ -155,2 +147,19 @@ const defaultRequestTimeout = this.requestTimeout | ||
return socketRequest | ||
} | ||
/** | ||
* @typedef {Object} PushedRequest | ||
* @property {import("./socketRequest").RequestEntry} entry | ||
* @property {boolean} expectResponse | ||
* @property {Function} sendRequest | ||
* @property {number} [requestTimeout] | ||
* | ||
* @public | ||
* @param {PushedRequest} pushedRequest | ||
*/ | ||
push(pushedRequest) { | ||
const { correlationId } = pushedRequest.entry | ||
const socketRequest = this.createSocketRequest(pushedRequest) | ||
if (this.canSendSocketRequestImmediately()) { | ||
@@ -307,10 +316,13 @@ this.sendSocketRequest(socketRequest) | ||
// we will anyways check the queue when one of them gets fulfilled. | ||
const timeUntilUnthrottled = this.throttledUntil - Date.now() | ||
if (timeUntilUnthrottled > 0 && !this.throttleCheckTimeoutId) { | ||
let scheduleAt = this.throttledUntil - Date.now() | ||
if (!this.throttleCheckTimeoutId) { | ||
if (this.pending.length > 0) { | ||
scheduleAt = scheduleAt > 0 ? scheduleAt : CHECK_PENDING_REQUESTS_INTERVAL | ||
} | ||
this.throttleCheckTimeoutId = setTimeout(() => { | ||
this.throttleCheckTimeoutId = null | ||
this.checkPendingRequests() | ||
}, timeUntilUnthrottled) | ||
}, scheduleAt) | ||
} | ||
} | ||
} |
731993
21304