Comparing version 1.4.0 to 1.4.1
@@ -8,2 +8,17 @@ # Changelog | ||
## [1.4.1] - 2018-10-17 | ||
### Fixed | ||
- Decode multiple RecordBatch on protocol Fetch v4 #179 | ||
- Skip incomplete record batches #182 | ||
- Producer with `acks=0` never resolve #181 | ||
### Added | ||
- Runtime flag for displaying buffers in debug output #176 | ||
- Add ZSTD to compression codecs and types #157 | ||
- Admin get topic metadata #174 | ||
### Changed | ||
- Add description to lock instances #178 | ||
## [1.4.0] - 2018-10-09 | ||
@@ -10,0 +25,0 @@ |
{ | ||
"name": "kafkajs", | ||
"version": "1.4.0", | ||
"version": "1.4.1", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -25,4 +25,4 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"scripts": { | ||
"test:local": "export KAFKA_VERSION=${KAFKA_VERSION:='0.11'} && NODE_ENV=test echo \"KAFKA_VERSION: ${KAFKA_VERSION}\" && ./node_modules/.bin/jest --forceExit --detectOpenHandles", | ||
"test:debug": "NODE_ENV=test node --inspect-brk node_modules/.bin/jest --detectOpenHandles --runInBand --watch", | ||
"test:local": "export KAFKA_VERSION=${KAFKA_VERSION:='0.11'} && NODE_ENV=test echo \"KAFKA_VERSION: ${KAFKA_VERSION}\" && KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 ./node_modules/.bin/jest --forceExit --detectOpenHandles", | ||
"test:debug": "NODE_ENV=test KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 node --inspect-brk node_modules/.bin/jest --detectOpenHandles --runInBand --watch", | ||
"test:local:watch": "yarn test:local --watch", | ||
@@ -29,0 +29,0 @@ "test": "yarn lint && JEST_JUNIT_OUTPUT=test-report.xml ./scripts/testWithKafka.sh 'yarn test:local --ci --maxWorkers=4 --no-watchman'", |
@@ -17,2 +17,3 @@ [](https://github.com/tulios/kafkajs) | ||
- Consumer groups with pause, resume, and seek | ||
- Message headers | ||
- GZIP compression | ||
@@ -57,2 +58,3 @@ - Snappy and LZ4 compression through plugins | ||
- [Delete topics](#admin-delete-topics) | ||
- [Get topic metadata](#admin-get-topic-metadata) | ||
- [Fetch consumer group offsets](#admin-fetch-offsets) | ||
@@ -67,2 +69,3 @@ - [Reset consumer group offsets](#admin-reset-offsets) | ||
- [Development](#development) | ||
- [Environment variables](#environment-variables) | ||
@@ -306,6 +309,11 @@ ## <a name="installation"></a> Installation | ||
topic: 'topic-c', | ||
messages: [{ key: 'key', value: 'hello topic-c' }], | ||
headers: { | ||
'correlation-id': '2bfb68bb-893a-423b-a7fa-7b568cad5b67', | ||
} | ||
messages: [ | ||
{ | ||
key: 'key', | ||
value: 'hello topic-c', | ||
headers: { | ||
'correlation-id': '2bfb68bb-893a-423b-a7fa-7b568cad5b67', | ||
}, | ||
} | ||
], | ||
} | ||
@@ -857,2 +865,46 @@ ] | ||
### <a name="admin-get-topic-metadata"></a> Get topic metadata | ||
```javascript | ||
await admin.getTopicMetadata({ topics: <Array<String> }) | ||
``` | ||
`TopicsMetadata` structure: | ||
```javascript | ||
{ | ||
topics: <Array<TopicMetadata>>, | ||
} | ||
``` | ||
`TopicMetadata` structure: | ||
```javascript | ||
{ | ||
topic: <String>, | ||
partitions: <Array<PartitionMetadata>> // default: 1 | ||
} | ||
``` | ||
`PartitionMetadata` structure: | ||
```javascript | ||
{ | ||
partitionErrorCode: <Number>, // default: 0 | ||
partitionId: <Number>, | ||
leader: <Number>, | ||
replicas: <Array<Number>>, | ||
isr: <Array<Number>>, | ||
} | ||
``` | ||
The admin client will throw an exception if any of the provided topics do not already exist. | ||
If you omit the `topics` argument the admin client will fetch metadata for all topics | ||
of which it is already aware (all the cluster's target topics): | ||
``` | ||
await admin.getTopicMetadata() | ||
``` | ||
### <a name="admin-fetch-offsets"></a> Fetch consumer group offsets | ||
@@ -1271,2 +1323,8 @@ | ||
### <a name="environment-variables"></a> Environment variables | ||
| variable | description | default | | ||
| ------------------------------ | ---------------------------------------- | ------- | | ||
| KAFKAJS_DEBUG_PROTOCOL_BUFFERS | Output raw protocol buffers in debug log | 0 | | ||
## Acknowledgements | ||
@@ -1273,0 +1331,0 @@ |
@@ -422,2 +422,59 @@ const createRetry = require('../retry') | ||
/** | ||
* Fetch metadata for provided topics. | ||
* | ||
* If no topics are provided fetch metadata for all topics of which we are aware. | ||
* @see https://kafka.apache.org/protocol#The_Messages_Metadata | ||
* | ||
* @param {Object} [options] | ||
* @param {Array<string>} [options.topics] | ||
* @return {Promise<TopicsMetadata>} | ||
* | ||
* @typedef {Object} TopicsMetadata | ||
* @property {Array<TopicMetadata>} topics | ||
* | ||
* @typedef {Object} TopicMetadata | ||
* @property {String} name | ||
* @property {Array<PartitionMetadata>} partitions | ||
* | ||
* @typedef {Object} PartitionMetadata | ||
* @property {number} partitionErrorCode Response error code | ||
* @property {number} partitionId Topic partition id | ||
* @property {number} leader The id of the broker acting as leader for this partition. | ||
* @property {Array<number>} replicas The set of all nodes that host this partition. | ||
* @property {Array<number>} isr The set of nodes that are in sync with the leader for this partition. | ||
*/ | ||
const getTopicMetadata = async options => { | ||
const { topics } = options || {} | ||
if (topics) { | ||
await Promise.all( | ||
topics.map(async topic => { | ||
if (!topic) { | ||
throw new KafkaJSNonRetriableError(`Invalid topic ${topic}`) | ||
} | ||
try { | ||
await cluster.addTargetTopic(topic) | ||
} catch (e) { | ||
e.message = `Failed to add target topic ${topic}: ${e.message}` | ||
throw e | ||
} | ||
}) | ||
) | ||
} | ||
await cluster.refreshMetadataIfNecessary() | ||
const targetTopics = topics || [...cluster.targetTopics] | ||
return { | ||
topics: await Promise.all( | ||
targetTopics.map(async topic => ({ | ||
name: topic, | ||
partitions: await cluster.findTopicPartitionMetadata(topic), | ||
})) | ||
), | ||
} | ||
} | ||
/** | ||
* @param {string} eventName | ||
@@ -452,2 +509,3 @@ * @param {Function} listener | ||
deleteTopics, | ||
getTopicMetadata, | ||
events, | ||
@@ -454,0 +512,0 @@ fetchOffsets, |
@@ -41,4 +41,9 @@ const Lock = require('../utils/lock') | ||
const lockTimeout = this.connection.connectionTimeout + this.authenticationTimeout | ||
this.lock = new Lock({ timeout: lockTimeout }) | ||
const brokerAddress = `${this.connection.host}:${this.connection.port}` | ||
this.lock = new Lock({ | ||
timeout: lockTimeout, | ||
description: `connect to broker ${brokerAddress}`, | ||
}) | ||
this.lookupRequest = () => { | ||
@@ -45,0 +50,0 @@ throw new Error('Broker not connected') |
@@ -6,2 +6,3 @@ const createRetry = require('../retry') | ||
const { KafkaJSConnectionError } = require('../errors') | ||
const getEnv = require('../env') | ||
@@ -63,2 +64,3 @@ /** | ||
this.logError = log('error') | ||
this.shouldLogBuffers = getEnv().KAFKAJS_DEBUG_PROTOCOL_BUFFERS === '1' | ||
} | ||
@@ -223,2 +225,4 @@ | ||
this.failIfNotConnected() | ||
const expectResponse = !request.expectResponse || request.expectResponse() | ||
const requestInfo = ({ apiName, apiKey, apiVersion }) => | ||
@@ -235,2 +239,3 @@ `${apiName}(key: ${apiKey}, version: ${apiVersion})` | ||
correlationId, | ||
expectResponse, | ||
size: Buffer.byteLength(requestPayload.buffer), | ||
@@ -242,4 +247,17 @@ }) | ||
this.failIfNotConnected() | ||
this.pendingQueue[correlationId] = { apiKey, apiName, apiVersion, resolve, reject } | ||
this.socket.write(requestPayload.buffer, 'binary') | ||
let entry = { apiKey, apiName, apiVersion } | ||
if (expectResponse) { | ||
entry = { ...entry, resolve, reject } | ||
this.pendingQueue[correlationId] = entry | ||
this.socket.write(requestPayload.buffer, 'binary') | ||
} else { | ||
this.socket.write(requestPayload.buffer, 'binary') | ||
resolve({ | ||
size: 0, | ||
payload: null, | ||
correlationId, | ||
entry, | ||
}) | ||
} | ||
} catch (e) { | ||
@@ -253,2 +271,6 @@ reject(e) | ||
if (!expectResponse) { | ||
return | ||
} | ||
try { | ||
@@ -274,6 +296,8 @@ const payloadDecoded = await response.decode(payload) | ||
const isBuffer = Buffer.isBuffer(payload) | ||
this.logDebug(`Response ${requestInfo(entry)}`, { | ||
error: e.message, | ||
correlationId, | ||
payload, | ||
payload: | ||
isBuffer && !this.shouldLogBuffers ? { type: 'Buffer', data: '[filtered]' } : payload, | ||
}) | ||
@@ -280,0 +304,0 @@ |
@@ -5,5 +5,6 @@ const flatten = require('../utils/flatten') | ||
const partitions = topics.map(({ topicName, partitions }) => | ||
partitions.map(partition => Object.assign({ topicName }, partition)) | ||
partitions.map(partition => ({ topicName, ...partition })) | ||
) | ||
return flatten(partitions) | ||
} |
@@ -80,3 +80,4 @@ const createRetry = require('../retry') | ||
const response = await broker.produce({ acks, timeout, compression, topicData }) | ||
responsePerBroker.set(broker, responseSerializer(response)) | ||
const expectResponse = acks !== 0 | ||
responsePerBroker.set(broker, expectResponse ? responseSerializer(response) : []) | ||
} catch (e) { | ||
@@ -83,0 +84,0 @@ responsePerBroker.delete(broker) |
@@ -11,2 +11,3 @@ const { KafkaJSNotImplemented } = require('../../../errors') | ||
LZ4: 3, | ||
ZSTD: 4, | ||
} | ||
@@ -22,2 +23,5 @@ | ||
}, | ||
[Types.ZSTD]: () => { | ||
throw new KafkaJSNotImplemented('ZSTD compression not implemented') | ||
}, | ||
} | ||
@@ -24,0 +28,0 @@ |
const Decoder = require('../../decoder') | ||
const { KafkaJSPartialMessageError } = require('../../../errors') | ||
const { lookupCodecByRecordBatchAttributes } = require('../../message/compression') | ||
const { KafkaJSPartialMessageError } = require('../../../errors') | ||
const RecordDecoder = require('../record/v0/decoder') | ||
@@ -27,7 +27,9 @@ | ||
module.exports = async decoder => { | ||
const firstOffset = decoder.readInt64().toString() | ||
const length = decoder.readInt32() | ||
module.exports = async fetchDecoder => { | ||
const firstOffset = fetchDecoder.readInt64().toString() | ||
const length = fetchDecoder.readInt32() | ||
const decoder = fetchDecoder.slice(length) | ||
fetchDecoder.forward(length) | ||
const remainingBytes = Buffer.byteLength(decoder.slice(length).buffer) | ||
const remainingBytes = Buffer.byteLength(decoder.buffer) | ||
@@ -62,6 +64,4 @@ if (remainingBytes < length) { | ||
const recordsSize = Buffer.byteLength(decoder.buffer) | ||
const recordsDecoder = decoder.slice(recordsSize) | ||
const recordContext = { firstOffset, firstTimestamp, magicByte } | ||
const records = await decodeRecords(codec, recordsDecoder, recordContext) | ||
const records = await decodeRecords(codec, decoder, recordContext) | ||
@@ -90,3 +90,3 @@ return { | ||
if (length === -1) { | ||
if (length <= 0) { | ||
return [] | ||
@@ -93,0 +93,0 @@ } |
@@ -10,2 +10,3 @@ const Decoder = require('../../../decoder') | ||
const MAGIC_OFFSET = 16 | ||
const RECORD_BATCH_OVERHEAD = 49 | ||
@@ -41,4 +42,18 @@ /** | ||
if (magicByte === MAGIC_BYTE) { | ||
const recordBatch = await RecordBatchDecoder(messagesDecoder) | ||
return recordBatch.records | ||
let records = [] | ||
while (messagesDecoder.canReadBytes(RECORD_BATCH_OVERHEAD)) { | ||
try { | ||
const recordBatch = await RecordBatchDecoder(messagesDecoder) | ||
records = [...records, ...recordBatch.records] | ||
} catch (e) { | ||
// The tail of the record batches can have incomplete records | ||
// due to how maxBytes works. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI | ||
if (e.name === 'KafkaJSPartialMessageError') { | ||
break | ||
} | ||
} | ||
} | ||
return records | ||
} | ||
@@ -45,0 +60,0 @@ |
@@ -67,2 +67,3 @@ const Encoder = require('../../../encoder') | ||
apiName: 'Produce', | ||
expectResponse: () => acks !== 0, | ||
encode: async () => { | ||
@@ -69,0 +70,0 @@ return new Encoder() |
@@ -13,2 +13,3 @@ const Encoder = require('../../../encoder') | ||
apiName: 'Produce', | ||
expectResponse: () => acks !== 0, | ||
encode: async () => { | ||
@@ -15,0 +16,0 @@ const encodeTopic = topicEncoder(compression) |
@@ -36,2 +36,3 @@ const Encoder = require('../../../encoder') | ||
apiName: 'Produce', | ||
expectResponse: () => acks !== 0, | ||
encode: async () => { | ||
@@ -38,0 +39,0 @@ const encodeTopic = topicEncoder(compression) |
@@ -0,1 +1,2 @@ | ||
const { format } = require('util') | ||
const { KafkaJSLockTimeout } = require('../errors') | ||
@@ -7,9 +8,16 @@ | ||
WAITING: Symbol('private:Lock:waiting'), | ||
TIMEOUT_ERROR_MESSAGE: Symbol('private:Lock:timeoutErrorMessage'), | ||
} | ||
const TIMEOUT_MESSAGE = 'Timeout while acquiring lock (%d waiting locks)' | ||
module.exports = class Lock { | ||
constructor({ timeout = 1000 } = {}) { | ||
constructor({ timeout = 1000, description = null } = {}) { | ||
this[PRIVATE.LOCKED] = false | ||
this[PRIVATE.TIMEOUT] = timeout | ||
this[PRIVATE.WAITING] = new Set() | ||
this[PRIVATE.TIMEOUT_ERROR_MESSAGE] = () => { | ||
const timeoutMessage = format(TIMEOUT_MESSAGE, this[PRIVATE.WAITING].size) | ||
return description ? `${timeoutMessage}: "${description}"` : timeoutMessage | ||
} | ||
} | ||
@@ -36,3 +44,3 @@ | ||
timeoutId = setTimeout( | ||
() => reject(new KafkaJSLockTimeout('Timeout while acquiring lock')), | ||
() => reject(new KafkaJSLockTimeout(this[PRIVATE.TIMEOUT_ERROR_MESSAGE]())), | ||
this[PRIVATE.TIMEOUT] | ||
@@ -39,0 +47,0 @@ ) |
484409
181
9091
1331