New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version 2.3.0-beta.2 to 2.3.0-beta.3

4

CHANGELOG.md

@@ -8,3 +8,7 @@ # Changelog

## [2.2.3] - 2022-11-21
### Fixed
- Fix regression in SASL/PLAIN authentication #1480
## [2.2.2] - 2022-10-18

@@ -11,0 +15,0 @@

6

package.json
{
"name": "kafkajs",
"version": "2.3.0-beta.2",
"version": "2.3.0-beta.3",
"description": "A modern Apache Kafka client for node.js",

@@ -85,5 +85,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

"kafkajs": {
"sha": "57d1e09e2737eb5e276f7219de07014b06f23d0b",
"compare": "https://github.com/tulios/kafkajs/compare/v2.2.2...57d1e09e2737eb5e276f7219de07014b06f23d0b"
"sha": "c7135f76ac57a02ac09daa80e251e1d0dedaeb91",
"compare": "https://github.com/tulios/kafkajs/compare/v2.2.3...c7135f76ac57a02ac09daa80e251e1d0dedaeb91"
}
}

@@ -80,4 +80,4 @@ const Lock = require('../utils/lock')

async connect() {
await this.lock.acquire()
try {
await this.lock.acquire()
if (this.isConnected()) {

@@ -84,0 +84,0 @@ return

@@ -397,2 +397,3 @@ const BrokerPool = require('./brokerPool')

nodeId,
groupId,
error: e,

@@ -399,0 +400,0 @@ })

@@ -5,3 +5,3 @@ const seq = require('../utils/seq')

const createWorkerQueue = require('./workerQueue')
const { KafkaJSFetcherRebalanceError } = require('../errors')
const { KafkaJSFetcherRebalanceError, KafkaJSNoBrokerAvailableError } = require('../errors')

@@ -38,2 +38,6 @@ /** @typedef {ReturnType<typeof createFetchManager>} FetchManager */

if (nodeIds.length === 0) {
throw new KafkaJSNoBrokerAvailableError()
}
const validateShouldRebalance = () => {

@@ -40,0 +44,0 @@ const current = getNodeIds()

@@ -147,3 +147,3 @@ const { EventEmitter } = require('events')

if (e.name === 'KafkaJSConnectionError') {
if (e.name === 'KafkaJSNoBrokerAvailableError') {
return bail(e)

@@ -150,0 +150,0 @@ }

@@ -261,4 +261,13 @@ const pkgJson = require('../package.json')

class KafkaJSNoBrokerAvailableError extends KafkaJSError {
constructor() {
super('No broker available')
this.name = 'KafkaJSNoBrokerAvailableError'
}
}
const isRebalancing = e =>
e.type === 'REBALANCE_IN_PROGRESS' || e.type === 'NOT_COORDINATOR_FOR_GROUP'
e.type === 'REBALANCE_IN_PROGRESS' ||
e.type === 'NOT_COORDINATOR_FOR_GROUP' ||
e.type === 'ILLEGAL_GENERATION'

@@ -297,2 +306,3 @@ const isKafkaJSError = e => e instanceof KafkaJSError

KafkaJSFetcherRebalanceError,
KafkaJSNoBrokerAvailableError,
KafkaJSAlterPartitionReassignmentsError,

@@ -299,0 +309,0 @@ isRebalancing,

@@ -12,2 +12,3 @@ const { EventEmitter } = require('events')

const REQUEST_QUEUE_EMPTY = 'requestQueueEmpty'
const CHECK_PENDING_REQUESTS_INTERVAL = 10

@@ -106,3 +107,4 @@ module.exports = class RequestQueue extends EventEmitter {

maybeThrottle(clientSideThrottleTime) {
if (clientSideThrottleTime) {
if (clientSideThrottleTime !== null && clientSideThrottleTime > 0) {
this.logger.debug(`Client side throttling in effect for ${clientSideThrottleTime}ms`)
const minimumThrottledUntil = Date.now() + clientSideThrottleTime

@@ -113,13 +115,3 @@ this.throttledUntil = Math.max(minimumThrottledUntil, this.throttledUntil)

/**
* @typedef {Object} PushedRequest
* @property {import("./socketRequest").RequestEntry} entry
* @property {boolean} expectResponse
* @property {Function} sendRequest
* @property {number} [requestTimeout]
*
* @public
* @param {PushedRequest} pushedRequest
*/
push(pushedRequest) {
createSocketRequest(pushedRequest) {
const { correlationId } = pushedRequest.entry

@@ -155,2 +147,19 @@ const defaultRequestTimeout = this.requestTimeout

return socketRequest
}
/**
* @typedef {Object} PushedRequest
* @property {import("./socketRequest").RequestEntry} entry
* @property {boolean} expectResponse
* @property {Function} sendRequest
* @property {number} [requestTimeout]
*
* @public
* @param {PushedRequest} pushedRequest
*/
push(pushedRequest) {
const { correlationId } = pushedRequest.entry
const socketRequest = this.createSocketRequest(pushedRequest)
if (this.canSendSocketRequestImmediately()) {

@@ -307,10 +316,13 @@ this.sendSocketRequest(socketRequest)

// we will anyways check the queue when one of them gets fulfilled.
const timeUntilUnthrottled = this.throttledUntil - Date.now()
if (timeUntilUnthrottled > 0 && !this.throttleCheckTimeoutId) {
let scheduleAt = this.throttledUntil - Date.now()
if (!this.throttleCheckTimeoutId) {
if (this.pending.length > 0) {
scheduleAt = scheduleAt > 0 ? scheduleAt : CHECK_PENDING_REQUESTS_INTERVAL
}
this.throttleCheckTimeoutId = setTimeout(() => {
this.throttleCheckTimeoutId = null
this.checkPendingRequests()
}, timeUntilUnthrottled)
}, scheduleAt)
}
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc