Comparing version 1.7.0 to 1.8.0
@@ -8,2 +8,15 @@ # Changelog | ||
## [1.8.0] - 2019-05-13 | ||
### Added | ||
- Add partition-aware concurrent mode for `eachMessage` #332 | ||
- Add `JavaCompatiblePartitioner` #358 | ||
- Add `consumer.subscribe({ topic: RegExp })` #346 | ||
- Update supported protocols to latest of Kafka 1 #343 #347 #348 | ||
### Changed | ||
- Add documentation link to `REBALANCE_IN_PROGRESS` error #341 | ||
### Fixed | ||
- Fix crash on offline replicas in metadata v5 response #350 | ||
## [1.7.0] - 2019-04-12 | ||
@@ -10,0 +23,0 @@ ### Fixed |
const Kafka = require('./src') | ||
const PartitionAssigners = require('./src/consumer/assigners') | ||
const AssignerProtocol = require('./src/consumer/assignerProtocol') | ||
const Partitioners = require('./src/producer/partitioners') | ||
const Compression = require('./src/protocol/message/compression') | ||
@@ -12,2 +13,3 @@ const ResourceTypes = require('./src/protocol/resourceTypes') | ||
AssignerProtocol, | ||
Partitioners, | ||
logLevel: LEVELS, | ||
@@ -14,0 +16,0 @@ CompressionTypes: Compression.Types, |
{ | ||
"name": "kafkajs", | ||
"version": "1.7.0", | ||
"version": "1.8.0", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -25,3 +25,3 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"scripts": { | ||
"test:local": "export KAFKA_VERSION=${KAFKA_VERSION:='1.1'} && NODE_ENV=test echo \"KAFKA_VERSION: ${KAFKA_VERSION}\" && KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 ./node_modules/.bin/jest --forceExit --detectOpenHandles", | ||
"test:local": "export KAFKA_VERSION=${KAFKA_VERSION:='1.1'} && NODE_ENV=test echo \"KAFKA_VERSION: ${KAFKA_VERSION}\" && KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 ./node_modules/.bin/jest --detectOpenHandles", | ||
"test:debug": "NODE_ENV=test KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 node --inspect-brk node_modules/.bin/jest --detectOpenHandles --runInBand --watch", | ||
@@ -28,0 +28,0 @@ "test:local:watch": "yarn test:local --watch", |
@@ -39,26 +39,31 @@ # <a href='https://kafka.js.org'><img src='https://raw.githubusercontent.com/tulios/kafkajs/master/logoV2.png' height='60' alt='KafkaJS' aria-label='kafka.js.org' /></a> | ||
// Producing | ||
const producer = kafka.producer() | ||
const consumer = kafka.consumer({ groupId: 'test-group' }) | ||
await producer.connect() | ||
await producer.send({ | ||
topic: 'test-topic', | ||
messages: [ | ||
{ value: 'Hello KafkaJS user!' }, | ||
], | ||
}) | ||
const run = async () => { | ||
// Producing | ||
await producer.connect() | ||
await producer.send({ | ||
topic: 'test-topic', | ||
messages: [ | ||
{ value: 'Hello KafkaJS user!' }, | ||
], | ||
}) | ||
// Consuming | ||
const consumer = kafka.consumer({ groupId: 'test-group' }) | ||
// Consuming | ||
await consumer.connect() | ||
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true }) | ||
await consumer.connect() | ||
await consumer.subscribe({ topic: 'test-topic' }) | ||
await consumer.run({ | ||
eachMessage: async ({ topic, partition, message }) => { | ||
console.log({ | ||
partition, | ||
offset: message.offset, | ||
value: message.value.toString(), | ||
}) | ||
}, | ||
}) | ||
} | ||
await consumer.run({ | ||
eachMessage: async ({ topic, partition, message }) => { | ||
console.log({ | ||
value: message.value.toString(), | ||
}) | ||
}, | ||
}) | ||
run().catch(console.error) | ||
``` | ||
@@ -88,4 +93,10 @@ | ||
### Sponsored by: | ||
<a href="https://www.digitalocean.com/?refcode=9ee868b06152&utm_campaign=Referral_Invite&utm_medium=opensource&utm_source=kafkajs"> | ||
<img src="https://opensource.nyc3.cdn.digitaloceanspaces.com/attribution/assets/SVG/DO_Logo_horizontal_blue.svg" width="201px"> | ||
</a> | ||
## License | ||
See [LICENSE](https://github.com/tulios/kafkajs/blob/master/LICENSE) for more details. |
@@ -346,2 +346,3 @@ const createRetry = require('../retry') | ||
* @param {Array<ResourceConfigQuery>} resources | ||
* @param {boolean} [includeSynonyms=false] | ||
* @return {Promise} | ||
@@ -354,3 +355,3 @@ * | ||
*/ | ||
const describeConfigs = async ({ resources }) => { | ||
const describeConfigs = async ({ resources, includeSynonyms }) => { | ||
if (!resources || !Array.isArray(resources)) { | ||
@@ -398,3 +399,3 @@ throw new KafkaJSNonRetriableError(`Invalid resources array ${resources}`) | ||
const broker = await cluster.findControllerBroker() | ||
const response = await broker.describeConfigs({ resources }) | ||
const response = await broker.describeConfigs({ resources, includeSynonyms }) | ||
return response | ||
@@ -401,0 +402,0 @@ } catch (e) { |
@@ -319,2 +319,4 @@ const Lock = require('../utils/lock') | ||
* no heartbeat after this timeout in ms | ||
* @param {number} rebalanceTimeout The maximum time that the coordinator will wait for each member | ||
* to rejoin when rebalancing the group | ||
* @param {string} [memberId=""] The assigned consumer id or an empty string for a new consumer | ||
@@ -329,2 +331,3 @@ * @param {string} [protocolType="consumer"] Unique name for class of protocols implemented by group | ||
sessionTimeout, | ||
rebalanceTimeout, | ||
memberId = '', | ||
@@ -334,3 +337,2 @@ protocolType = 'consumer', | ||
}) { | ||
// TODO: validate groupId and sessionTimeout (maybe default for sessionTimeout) | ||
const joinGroup = this.lookupRequest(apiKeys.JoinGroup, requests.JoinGroup) | ||
@@ -341,2 +343,3 @@ return await this.connection.send( | ||
sessionTimeout, | ||
rebalanceTimeout, | ||
memberId, | ||
@@ -446,3 +449,3 @@ protocolType, | ||
* @param {string} groupId | ||
* @param {object} topics e.g: | ||
* @param {object} topics - If the topic array is null fetch offsets for all topics. e.g: | ||
* [ | ||
@@ -515,7 +518,8 @@ * { | ||
* }] | ||
* @param {boolean} [includeSynonyms=false] | ||
* @returns {Promise} | ||
*/ | ||
async describeConfigs({ resources }) { | ||
async describeConfigs({ resources, includeSynonyms = false }) { | ||
const describeConfigs = this.lookupRequest(apiKeys.DescribeConfigs, requests.DescribeConfigs) | ||
return await this.connection.send(describeConfigs({ resources })) | ||
return await this.connection.send(describeConfigs({ resources, includeSynonyms })) | ||
} | ||
@@ -522,0 +526,0 @@ |
@@ -153,4 +153,16 @@ const BrokerPool = require('./brokerPool') | ||
async addTargetTopic(topic) { | ||
return this.addMultipleTargetTopics([topic]) | ||
} | ||
/** | ||
* @public | ||
* @param {string[]} topics | ||
* @return {Promise} | ||
*/ | ||
async addMultipleTargetTopics(topics) { | ||
const previousSize = this.targetTopics.size | ||
this.targetTopics.add(topic) | ||
for (let topic of topics) { | ||
this.targetTopics.add(topic) | ||
} | ||
const hasChanged = previousSize !== this.targetTopics.size || !this.brokerPool.metadata | ||
@@ -157,0 +169,0 @@ |
@@ -37,2 +37,3 @@ const flatten = require('../utils/flatten') | ||
sessionTimeout, | ||
rebalanceTimeout, | ||
maxBytesPerPartition, | ||
@@ -55,2 +56,3 @@ minBytes, | ||
this.sessionTimeout = sessionTimeout | ||
this.rebalanceTimeout = rebalanceTimeout | ||
this.maxBytesPerPartition = maxBytesPerPartition | ||
@@ -85,3 +87,3 @@ this.minBytes = minBytes | ||
async join() { | ||
const { groupId, sessionTimeout } = this | ||
const { groupId, sessionTimeout, rebalanceTimeout } = this | ||
@@ -93,2 +95,3 @@ this.coordinator = await this.cluster.findGroupCoordinator({ groupId }) | ||
sessionTimeout, | ||
rebalanceTimeout, | ||
memberId: this.memberId || '', | ||
@@ -95,0 +98,0 @@ groupProtocols: this.assigners.map(assigner => assigner.protocol({ topics: this.topics })), |
@@ -32,2 +32,3 @@ const Long = require('long') | ||
sessionTimeout = 30000, | ||
rebalanceTimeout = 60000, | ||
heartbeatInterval = 3000, | ||
@@ -71,2 +72,3 @@ maxBytesPerPartition = 1048576, // 1MB | ||
sessionTimeout, | ||
rebalanceTimeout, | ||
maxBytesPerPartition, | ||
@@ -83,3 +85,10 @@ minBytes, | ||
const createRunner = ({ eachBatchAutoResolve, eachBatch, eachMessage, onCrash, autoCommit }) => { | ||
const createRunner = ({ | ||
eachBatchAutoResolve, | ||
eachBatch, | ||
eachMessage, | ||
onCrash, | ||
autoCommit, | ||
partitionsConsumedConcurrently, | ||
}) => { | ||
return new Runner({ | ||
@@ -96,2 +105,3 @@ autoCommit, | ||
onCrash, | ||
partitionsConsumedConcurrently, | ||
}) | ||
@@ -137,4 +147,4 @@ } | ||
/** | ||
* @param {string} topic | ||
* @param {string} [fromBeginning=false] | ||
* @param {string | RegExp} topic | ||
* @param {boolean} [fromBeginning=false] | ||
* @return {Promise} | ||
@@ -147,4 +157,33 @@ */ | ||
topics[topic] = { fromBeginning } | ||
await cluster.addTargetTopic(topic) | ||
const isRegExp = topic instanceof RegExp | ||
if (typeof topic !== 'string' && !isRegExp) { | ||
throw new KafkaJSNonRetriableError( | ||
`Invalid topic ${topic} (${typeof topic}), the topic name has to be a String or a RegExp` | ||
) | ||
} | ||
const topicsToSubscribe = [] | ||
if (isRegExp) { | ||
const topicRegExp = topic | ||
const metadata = await cluster.metadata() | ||
const matchedTopics = metadata.topicMetadata | ||
.map(({ topic: topicName }) => topicName) | ||
.filter(topicName => topicRegExp.test(topicName)) | ||
logger.debug('Subscription based on RegExp', { | ||
groupId, | ||
topicRegExp: topicRegExp.toString(), | ||
matchedTopics, | ||
}) | ||
topicsToSubscribe.push(...matchedTopics) | ||
} else { | ||
topicsToSubscribe.push(topic) | ||
} | ||
for (let t of topicsToSubscribe) { | ||
topics[t] = { fromBeginning } | ||
} | ||
await cluster.addMultipleTargetTopics(topicsToSubscribe) | ||
} | ||
@@ -158,2 +197,3 @@ | ||
* the callback succeeds | ||
* @param {number} [partitionsConsumedConcurrently=1] | ||
* @param {Function} [eachBatch=null] | ||
@@ -168,2 +208,3 @@ * @param {Function} [eachMessage=null] | ||
eachBatchAutoResolve = true, | ||
partitionsConsumedConcurrently = 1, | ||
eachBatch = null, | ||
@@ -190,2 +231,3 @@ eachMessage = null, | ||
onCrash, | ||
partitionsConsumedConcurrently, | ||
}) | ||
@@ -192,0 +234,0 @@ |
const createRetry = require('../retry') | ||
const limitConcurrency = require('../utils/concurrency') | ||
const { KafkaJSError } = require('../errors') | ||
@@ -20,2 +21,3 @@ const { | ||
eachBatchAutoResolve = true, | ||
partitionsConsumedConcurrently, | ||
eachBatch, | ||
@@ -38,2 +40,3 @@ eachMessage, | ||
this.autoCommit = autoCommit | ||
this.partitionsConsumedConcurrently = partitionsConsumedConcurrently | ||
@@ -222,11 +225,3 @@ this.running = false | ||
for (let batch of batches) { | ||
if (!this.running) { | ||
break | ||
} | ||
if (batch.isEmpty()) { | ||
continue | ||
} | ||
const onBatch = async batch => { | ||
const startBatchProcess = Date.now() | ||
@@ -257,2 +252,19 @@ const payload = { | ||
const concurrently = limitConcurrency({ limit: this.partitionsConsumedConcurrently }) | ||
await Promise.all( | ||
batches.map(batch => | ||
concurrently(async () => { | ||
if (!this.running) { | ||
return | ||
} | ||
if (batch.isEmpty()) { | ||
return | ||
} | ||
await onBatch(batch) | ||
}) | ||
) | ||
) | ||
await this.autoCommitOffsets() | ||
@@ -259,0 +271,0 @@ await this.consumerGroup.heartbeat({ interval: this.heartbeatInterval }) |
@@ -8,2 +8,3 @@ class KafkaJSError extends Error { | ||
this.retriable = retriable | ||
this.helpUrl = e.helpUrl | ||
} | ||
@@ -10,0 +11,0 @@ } |
@@ -111,2 +111,3 @@ const { | ||
sessionTimeout, | ||
rebalanceTimeout, | ||
heartbeatInterval, | ||
@@ -142,2 +143,3 @@ maxBytesPerPartition, | ||
sessionTimeout, | ||
rebalanceTimeout, | ||
heartbeatInterval, | ||
@@ -144,0 +146,0 @@ maxBytesPerPartition, |
const createRetry = require('../retry') | ||
const createDefaultPartitioner = require('./partitioners/default') | ||
const { DefaultPartitioner } = require('./partitioners/') | ||
const InstrumentationEventEmitter = require('../instrumentation/emitter') | ||
@@ -20,3 +20,3 @@ const createEosManager = require('./eosManager') | ||
logger: rootLogger, | ||
createPartitioner = createDefaultPartitioner, | ||
createPartitioner = DefaultPartitioner, | ||
retry, | ||
@@ -23,0 +23,0 @@ idempotent = false, |
@@ -1,45 +0,4 @@ | ||
const randomBytes = require('./randomBytes') | ||
const murmur2 = require('./murmur2') | ||
const createDefaultPartitioner = require('./partitioner') | ||
// Based on the java client 0.10.2 | ||
// https://github.com/apache/kafka/blob/0.10.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java | ||
/** | ||
* A cheap way to deterministically convert a number to a positive value. When the input is | ||
* positive, the original value is returned. When the input number is negative, the returned | ||
* positive value is the original value bit AND against 0x7fffffff which is not its absolutely | ||
* value. | ||
*/ | ||
const toPositive = x => x & 0x7fffffff | ||
/** | ||
* The default partitioning strategy: | ||
* - If a partition is specified in the message, use it | ||
* - If no partition is specified but a key is present choose a partition based on a hash of the key | ||
* - If no partition or key is present choose a partition in a round-robin fashion | ||
*/ | ||
module.exports = () => { | ||
let counter = randomBytes(32).readUInt32BE(0) | ||
return ({ topic, partitionMetadata, message }) => { | ||
const numPartitions = partitionMetadata.length | ||
const availablePartitions = partitionMetadata.filter(p => p.leader >= 0) | ||
const numAvailablePartitions = availablePartitions.length | ||
if (message.partition !== null && message.partition !== undefined) { | ||
return message.partition | ||
} | ||
if (message.key !== null && message.key !== undefined) { | ||
return toPositive(murmur2(message.key)) % numPartitions | ||
} | ||
if (numAvailablePartitions > 0) { | ||
const i = toPositive(++counter) % numAvailablePartitions | ||
return availablePartitions[i].partitionId | ||
} | ||
// no partitions are available, give a non-available partition | ||
return toPositive(++counter) % numPartitions | ||
} | ||
} | ||
module.exports = createDefaultPartitioner(murmur2) |
const { KafkaJSProtocolError } = require('../errors') | ||
const websiteUrl = require('../utils/websiteUrl') | ||
@@ -177,2 +178,3 @@ const errorCodes = [ | ||
message: 'The group is rebalancing, so a rejoin is needed', | ||
helpUrl: websiteUrl('docs/faq', 'what-does-it-mean-to-get-rebalance-in-progress-errors'), | ||
}, | ||
@@ -488,2 +490,35 @@ { | ||
}, | ||
{ | ||
type: 'STALE_BROKER_EPOCH', | ||
code: 77, | ||
retriable: false, | ||
message: 'Broker epoch has changed', | ||
}, | ||
{ | ||
type: 'OFFSET_NOT_AVAILABLE', | ||
code: 78, | ||
retriable: true, | ||
message: | ||
'The leader high watermark has not caught up from a recent leader election so the offsets cannot be guaranteed to be monotonically increasing', | ||
}, | ||
{ | ||
type: 'MEMBER_ID_REQUIRED', | ||
code: 79, | ||
retriable: false, | ||
message: | ||
'The group member needs to have a valid member id before actually entering a consumer group', | ||
}, | ||
{ | ||
type: 'PREFERRED_LEADER_NOT_AVAILABLE', | ||
code: 80, | ||
retriable: true, | ||
message: 'The preferred leader was not available', | ||
}, | ||
{ | ||
type: 'GROUP_MAX_SIZE_REACHED', | ||
code: 81, | ||
retriable: false, | ||
message: | ||
'The consumer group has reached its max size. It already has the configured maximum number of members', | ||
}, | ||
] | ||
@@ -490,0 +525,0 @@ |
@@ -45,2 +45,3 @@ module.exports = { | ||
DeleteGroups: 42, // ApiVersions v2 on Kafka 1.0 | ||
ElectPreferredLeaders: 43, | ||
} |
@@ -12,2 +12,7 @@ const versions = { | ||
}, | ||
2: ({ topics, validateOnly, timeout }) => { | ||
const request = require('./v2/request') | ||
const response = require('./v2/response') | ||
return { request: request({ topics, validateOnly, timeout }), response } | ||
}, | ||
} | ||
@@ -14,0 +19,0 @@ |
@@ -7,2 +7,7 @@ const versions = { | ||
}, | ||
1: ({ topics, timeout }) => { | ||
const request = require('./v1/request') | ||
const response = require('./v1/response') | ||
return { request: request({ topics, timeout }), response } | ||
}, | ||
} | ||
@@ -9,0 +14,0 @@ |
@@ -7,2 +7,7 @@ const versions = { | ||
}, | ||
1: ({ resources, includeSynonyms }) => { | ||
const request = require('./v1/request') | ||
const response = require('./v1/response') | ||
return { request: request({ resources, includeSynonyms }), response } | ||
}, | ||
} | ||
@@ -9,0 +14,0 @@ |
@@ -7,2 +7,7 @@ const versions = { | ||
}, | ||
1: ({ groupIds }) => { | ||
const request = require('./v1/request') | ||
const response = require('./v1/response') | ||
return { request: request({ groupIds }), response } | ||
}, | ||
} | ||
@@ -9,0 +14,0 @@ |
@@ -67,2 +67,63 @@ const ISOLATION_LEVEL = require('../../isolationLevel') | ||
}, | ||
5: ({ | ||
replicaId = REPLICA_ID, | ||
isolationLevel = ISOLATION_LEVEL.READ_COMMITTED, | ||
maxWaitTime, | ||
minBytes, | ||
maxBytes, | ||
topics, | ||
}) => { | ||
const request = require('./v5/request') | ||
const response = require('./v5/response') | ||
return { | ||
request: request({ replicaId, isolationLevel, maxWaitTime, minBytes, maxBytes, topics }), | ||
response, | ||
requestTimeout: requestTimeout(maxWaitTime), | ||
} | ||
}, | ||
6: ({ | ||
replicaId = REPLICA_ID, | ||
isolationLevel = ISOLATION_LEVEL.READ_COMMITTED, | ||
maxWaitTime, | ||
minBytes, | ||
maxBytes, | ||
topics, | ||
}) => { | ||
const request = require('./v6/request') | ||
const response = require('./v6/response') | ||
return { | ||
request: request({ replicaId, isolationLevel, maxWaitTime, minBytes, maxBytes, topics }), | ||
response, | ||
requestTimeout: requestTimeout(maxWaitTime), | ||
} | ||
}, | ||
7: ({ | ||
replicaId = REPLICA_ID, | ||
isolationLevel = ISOLATION_LEVEL.READ_COMMITTED, | ||
sessionId = 0, | ||
sessionEpoch = -1, | ||
forgottenTopics = [], | ||
maxWaitTime, | ||
minBytes, | ||
maxBytes, | ||
topics, | ||
}) => { | ||
const request = require('./v7/request') | ||
const response = require('./v7/response') | ||
return { | ||
request: request({ | ||
replicaId, | ||
isolationLevel, | ||
sessionId, | ||
sessionEpoch, | ||
forgottenTopics, | ||
maxWaitTime, | ||
minBytes, | ||
maxBytes, | ||
topics, | ||
}), | ||
response, | ||
requestTimeout: requestTimeout(maxWaitTime), | ||
} | ||
}, | ||
} | ||
@@ -69,0 +130,0 @@ |
const Decoder = require('../../../decoder') | ||
const { parse: parseV1 } = require('../v1/response') | ||
const MessageSetDecoder = require('../../../messageSet/decoder') | ||
const RecordBatchDecoder = require('../../../recordBatch/v0/decoder') | ||
const { MAGIC_BYTE } = require('../../../recordBatch/v0') | ||
const decodeMessages = require('./decodeMessages') | ||
// the magic offset is at the same offset for all current message formats, but the 4 bytes | ||
// between the size and the magic is dependent on the version. | ||
const MAGIC_OFFSET = 16 | ||
const RECORD_BATCH_OVERHEAD = 49 | ||
/** | ||
@@ -29,37 +22,2 @@ * Fetch Response (Version: 4) => throttle_time_ms [responses] | ||
const decodeMessages = async decoder => { | ||
const messagesSize = decoder.readInt32() | ||
if (messagesSize <= 0 || !decoder.canReadBytes(messagesSize)) { | ||
return [] | ||
} | ||
const messagesBuffer = decoder.readBytes(messagesSize) | ||
const messagesDecoder = new Decoder(messagesBuffer) | ||
const magicByte = messagesBuffer.slice(MAGIC_OFFSET).readInt8(0) | ||
if (magicByte === MAGIC_BYTE) { | ||
let records = [] | ||
while (messagesDecoder.canReadBytes(RECORD_BATCH_OVERHEAD)) { | ||
try { | ||
const recordBatch = await RecordBatchDecoder(messagesDecoder) | ||
records = [...records, ...recordBatch.records] | ||
} catch (e) { | ||
// The tail of the record batches can have incomplete records | ||
// due to how maxBytes works. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI | ||
if (e.name === 'KafkaJSPartialMessageError') { | ||
break | ||
} | ||
throw e | ||
} | ||
} | ||
return records | ||
} | ||
return MessageSetDecoder(messagesDecoder, messagesSize) | ||
} | ||
const decodeAbortedTransactions = decoder => ({ | ||
@@ -66,0 +24,0 @@ producerId: decoder.readInt64().toString(), |
@@ -10,2 +10,10 @@ const versions = { | ||
}, | ||
1: ({ groupId, groupGenerationId, memberId }) => { | ||
const request = require('./v1/request') | ||
const response = require('./v1/response') | ||
return { | ||
request: request({ groupId, groupGenerationId, memberId }), | ||
response, | ||
} | ||
}, | ||
} | ||
@@ -12,0 +20,0 @@ |
const NETWORK_DELAY = 5000 | ||
/** | ||
* @see https://github.com/apache/kafka/pull/5203 | ||
* The JOIN_GROUP request may block up to sessionTimeout (or rebalanceTimeout in JoinGroupV1), | ||
* so we should override the requestTimeout to be a bit more than the sessionTimeout | ||
* NOTE: the sessionTimeout can be configured as Number.MAX_SAFE_INTEGER and overflow when | ||
* increased, so we have to check for potential overflows | ||
**/ | ||
const requestTimeout = ({ rebalanceTimeout, sessionTimeout }) => { | ||
const timeout = rebalanceTimeout || sessionTimeout | ||
return Number.isSafeInteger(timeout + NETWORK_DELAY) ? timeout + NETWORK_DELAY : timeout | ||
} | ||
const versions = { | ||
@@ -7,12 +20,17 @@ 0: ({ groupId, sessionTimeout, memberId, protocolType, groupProtocols }) => { | ||
/** | ||
* @see https://github.com/apache/kafka/pull/5203 | ||
* The JOIN_GROUP request may block up to sessionTimeout (or rebalanceTimeout in JoinGroupV1), | ||
* so we should override the requestTimeout to be a bit more than the sessionTimeout | ||
* NOTE: the sessionTimeout can be configured as Number.MAX_SAFE_INTEGER and overflow when | ||
* increased, so we have to check for potential overflows | ||
**/ | ||
const requestTimeout = Number.isSafeInteger(sessionTimeout + NETWORK_DELAY) | ||
? sessionTimeout + NETWORK_DELAY | ||
: sessionTimeout | ||
return { | ||
request: request({ | ||
groupId, | ||
sessionTimeout, | ||
memberId, | ||
protocolType, | ||
groupProtocols, | ||
}), | ||
response, | ||
requestTimeout: requestTimeout({ rebalanceTimeout: null, sessionTimeout }), | ||
} | ||
}, | ||
1: ({ groupId, sessionTimeout, rebalanceTimeout, memberId, protocolType, groupProtocols }) => { | ||
const request = require('./v1/request') | ||
const response = require('./v1/response') | ||
@@ -23,2 +41,3 @@ return { | ||
sessionTimeout, | ||
rebalanceTimeout, | ||
memberId, | ||
@@ -29,5 +48,22 @@ protocolType, | ||
response, | ||
requestTimeout, | ||
requestTimeout: requestTimeout({ rebalanceTimeout, sessionTimeout }), | ||
} | ||
}, | ||
2: ({ groupId, sessionTimeout, rebalanceTimeout, memberId, protocolType, groupProtocols }) => { | ||
const request = require('./v2/request') | ||
const response = require('./v2/response') | ||
return { | ||
request: request({ | ||
groupId, | ||
sessionTimeout, | ||
rebalanceTimeout, | ||
memberId, | ||
protocolType, | ||
groupProtocols, | ||
}), | ||
response, | ||
requestTimeout: requestTimeout({ rebalanceTimeout, sessionTimeout }), | ||
} | ||
}, | ||
} | ||
@@ -34,0 +70,0 @@ |
@@ -10,2 +10,10 @@ const versions = { | ||
}, | ||
1: ({ groupId, memberId }) => { | ||
const request = require('./v1/request') | ||
const response = require('./v1/response') | ||
return { | ||
request: request({ groupId, memberId }), | ||
response, | ||
} | ||
}, | ||
} | ||
@@ -12,0 +20,0 @@ |
@@ -27,2 +27,7 @@ const versions = { | ||
}, | ||
5: ({ topics, allowAutoTopicCreation }) => { | ||
const request = require('./v5/request') | ||
const response = require('./v5/response') | ||
return { request: request({ topics, allowAutoTopicCreation }), response } | ||
}, | ||
} | ||
@@ -29,0 +34,0 @@ |
@@ -29,2 +29,16 @@ // This value signals to the broker that its default configuration should be used. | ||
}, | ||
3: ({ groupId, groupGenerationId, memberId, retentionTime = RETENTION_TIME, topics }) => { | ||
const request = require('./v3/request') | ||
const response = require('./v3/response') | ||
return { | ||
request: request({ | ||
groupId, | ||
groupGenerationId, | ||
memberId, | ||
retentionTime, | ||
topics, | ||
}), | ||
response, | ||
} | ||
}, | ||
} | ||
@@ -31,0 +45,0 @@ |
@@ -12,2 +12,7 @@ const versions = { | ||
}, | ||
3: ({ groupId, topics }) => { | ||
const request = require('./v3/request') | ||
const response = require('./v3/response') | ||
return { request: request({ groupId, topics }), response } | ||
}, | ||
} | ||
@@ -14,0 +19,0 @@ |
@@ -33,2 +33,34 @@ const versions = { | ||
}, | ||
4: ({ acks, timeout, compression, topicData, transactionalId, producerId, producerEpoch }) => { | ||
const request = require('./v4/request') | ||
const response = require('./v4/response') | ||
return { | ||
request: request({ | ||
acks, | ||
timeout, | ||
compression, | ||
topicData, | ||
transactionalId, | ||
producerId, | ||
producerEpoch, | ||
}), | ||
response, | ||
} | ||
}, | ||
5: ({ acks, timeout, compression, topicData, transactionalId, producerId, producerEpoch }) => { | ||
const request = require('./v5/request') | ||
const response = require('./v5/response') | ||
return { | ||
request: request({ | ||
acks, | ||
timeout, | ||
compression, | ||
topicData, | ||
transactionalId, | ||
producerId, | ||
producerEpoch, | ||
}), | ||
response, | ||
} | ||
}, | ||
} | ||
@@ -35,0 +67,0 @@ |
@@ -10,2 +10,10 @@ const versions = { | ||
}, | ||
1: ({ groupId, generationId, memberId, groupAssignment }) => { | ||
const request = require('./v1/request') | ||
const response = require('./v1/response') | ||
return { | ||
request: request({ groupId, generationId, memberId, groupAssignment }), | ||
response, | ||
} | ||
}, | ||
} | ||
@@ -12,0 +20,0 @@ |
435439
259
12965
101