Comparing version 1.3.0 to 1.3.1
@@ -8,2 +8,12 @@ # Changelog | ||
## [1.3.1] - 2018-08-20 | ||
### Fixed | ||
- Client logger accessor #106 | ||
- Producer v3 decode format #114 | ||
- Parsing multiple responses #115 | ||
- Fetch v4 for partial messages on record batch #116 | ||
### Added | ||
- Connection instrumentation events #110 | ||
## [1.3.0] - 2018-08-06 | ||
@@ -18,3 +28,3 @@ ### Fixed | ||
- Notify user when setting heartbeat interval to same or higher than session timeout #91 | ||
- Constantly refresh metatada based on `metadataMaxAge` #94 | ||
- Constantly refresh metadata based on `metadataMaxAge` #94 | ||
- New instrumentation events #95 | ||
@@ -21,0 +31,0 @@ - Expose loggers #97 #102 |
{ | ||
"name": "kafkajs", | ||
"version": "1.3.0", | ||
"version": "1.3.1", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -28,6 +28,16 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"test:debug": "NODE_ENV=test node --inspect-brk node_modules/.bin/jest --runInBand --watch", | ||
"test": "NODE_ENV=test yarn lint && JEST_JUNIT_OUTPUT=test-report.xml ./scripts/testWithKafka.sh './node_modules/.bin/jest --maxWorkers=4 --no-watchman --forceExit'", | ||
"test": "NODE_ENV=test yarn lint && JEST_JUNIT_OUTPUT=test-report.xml ./scripts/testWithKafka.sh './node_modules/.bin/jest --ci --maxWorkers=4 --no-watchman --forceExit'", | ||
"lint": "find . -path ./node_modules -prune -o -path ./coverage -prune -o -name '*.js' -print0 | xargs -0 ./node_modules/.bin/eslint", | ||
"format": "find . -path ./node_modules -prune -o -path ./coverage -prune -o -name '*.js' -print0 | xargs -0 ./node_modules/.bin/prettier --write", | ||
"precommit": "lint-staged" | ||
"precommit": "lint-staged", | ||
"test:group:broker": "./node_modules/.bin/jest --forceExit --testPathPattern 'src/broker/.*'", | ||
"test:group:admin": "NODE_ENV=test ./node_modules/.bin/jest --forceExit --testPathPattern 'src/admin/.*'", | ||
"test:group:producer": "NODE_ENV=test ./node_modules/.bin/jest --forceExit --testPathPattern 'src/producer/.*'", | ||
"test:group:consumer": "NODE_ENV=test ./node_modules/.bin/jest --forceExit --testPathPattern 'src/consumer/.*'", | ||
"test:group:others": "NODE_ENV=test ./node_modules/.bin/jest --forceExit --testPathPattern 'src/(?!(broker|admin|producer|consumer)/).*'", | ||
"test:group:broker:ci": "NODE_ENV=test JEST_JUNIT_OUTPUT=test-report.xml ./scripts/testWithKafka.sh \"yarn test:group:broker --ci --maxWorkers=4 --no-watchman\"", | ||
"test:group:admin:ci": "NODE_ENV=test JEST_JUNIT_OUTPUT=test-report.xml ./scripts/testWithKafka.sh \"yarn test:group:admin --ci --maxWorkers=4 --no-watchman\"", | ||
"test:group:producer:ci": "NODE_ENV=test JEST_JUNIT_OUTPUT=test-report.xml ./scripts/testWithKafka.sh \"yarn test:group:producer --ci --maxWorkers=4 --no-watchman\"", | ||
"test:group:consumer:ci": "NODE_ENV=test JEST_JUNIT_OUTPUT=test-report.xml ./scripts/testWithKafka.sh \"yarn test:group:consumer --ci --maxWorkers=4 --no-watchman\"", | ||
"test:group:others:ci": "NODE_ENV=test JEST_JUNIT_OUTPUT=test-report.xml ./scripts/testWithKafka.sh \"yarn test:group:others --ci --maxWorkers=4 --no-watchman\"" | ||
}, | ||
@@ -34,0 +44,0 @@ "devDependencies": { |
@@ -42,3 +42,3 @@ [](https://github.com/tulios/kafkajs) | ||
- [Options](#consuming-messages-options) | ||
- [Pause, Resume, & Seek](#consuming-messages-pause-resume) | ||
- [Pause & Resume](#consuming-messages-pause-resume) | ||
- [Seek](#consuming-messages-seek) | ||
@@ -133,3 +133,3 @@ - [Custom partition assigner](#consuming-messages-custom-partition-assigner) | ||
The `retry` option can be used to set the configuration of the retry mechanism, which is be used to retry connections and API calls to Kafka (when using producers or consumers). | ||
The `retry` option can be used to set the configuration of the retry mechanism, which is used to retry connections and API calls to Kafka (when using producers or consumers). | ||
@@ -722,3 +722,3 @@ The retry mechanism uses a randomization function that grows exponentially. | ||
The admin client will host all the cluster operations, such as: `createTopics`, `createPartitions`, etc. Currently, only `createTopics` is available. | ||
The admin client will host all the cluster operations, such as: `createTopics`, `createPartitions`, etc. | ||
@@ -796,3 +796,3 @@ ```javascript | ||
`setOffsets` allows you to set the consumer group offset to any value offset. | ||
`setOffsets` allows you to set the consumer group offset to any value. | ||
@@ -833,3 +833,3 @@ ```javascript | ||
Some operations are instrumented using the `EventEmitter`. To receive the events use the method `consumer#on`, example: | ||
Some operations are instrumented using the `EventEmitter`. To receive the events use the method `consumer#on`, `producer#on` and `admin#on`, example: | ||
@@ -844,4 +844,17 @@ ```javascript | ||
Instrumentation Event: | ||
```javascript | ||
{ | ||
id: <Number>, | ||
type: <String>, | ||
timestamp: <Number>, | ||
payload: <Object> | ||
} | ||
``` | ||
List of available events: | ||
### Consumer | ||
* consumer.events.HEARTBEAT | ||
@@ -865,13 +878,20 @@ payload: {`groupId`, `memberId`, `groupGenerationId`} | ||
Instrumentation Event: | ||
* consumer.events.CONNECT | ||
```javascript | ||
{ | ||
id: <Number>, | ||
type: <String>, | ||
timestamp: <Number>, | ||
payload: <Object> | ||
} | ||
``` | ||
* consumer.events.DISCONNECT | ||
* consumer.events.STOP | ||
### Producer | ||
* producer.events.CONNECT | ||
* producer.events.DISCONNECT | ||
### Admin | ||
* admin.events.CONNECT | ||
* admin.events.DISCONNECT | ||
## <a name="custom-logging"></a> Custom logging | ||
@@ -878,0 +898,0 @@ |
const createRetry = require('../retry') | ||
const waitFor = require('../utils/waitFor') | ||
const createConsumer = require('../consumer') | ||
const InstrumentationEventEmitter = require('../instrumentation/emitter') | ||
const events = require('./instrumentationEvents') | ||
const { CONNECT, DISCONNECT } = require('./instrumentationEvents') | ||
const { LEVELS } = require('../loggers') | ||
const { KafkaJSNonRetriableError } = require('../errors') | ||
const { values, keys } = Object | ||
const eventNames = values(events) | ||
const eventKeys = keys(events) | ||
.map(key => `admin.events.${key}`) | ||
.join(', ') | ||
const retryOnLeaderNotAvailable = (fn, opts = {}) => { | ||
@@ -35,2 +44,3 @@ const callback = async () => { | ||
const logger = rootLogger.namespace('Admin') | ||
const instrumentationEmitter = new InstrumentationEventEmitter() | ||
@@ -40,3 +50,6 @@ /** | ||
*/ | ||
const connect = async () => await cluster.connect() | ||
const connect = async () => { | ||
await cluster.connect() | ||
instrumentationEmitter.emit(CONNECT) | ||
} | ||
@@ -46,3 +59,6 @@ /** | ||
*/ | ||
const disconnect = async () => await cluster.disconnect() | ||
const disconnect = async () => { | ||
await cluster.disconnect() | ||
instrumentationEmitter.emit(DISCONNECT) | ||
} | ||
@@ -222,2 +238,22 @@ /** | ||
/** | ||
* @param {string} eventName | ||
* @param {Function} listener | ||
* @return {Function} | ||
*/ | ||
const on = (eventName, listener) => { | ||
if (!eventNames.includes(eventName)) { | ||
throw new KafkaJSNonRetriableError(`Event name should be one of ${eventKeys}`) | ||
} | ||
return instrumentationEmitter.addListener(eventName, event => { | ||
Promise.resolve(listener(event)).catch(e => { | ||
logger.error(`Failed to execute listener: ${e.message}`, { | ||
eventName, | ||
stack: e.stack, | ||
}) | ||
}) | ||
}) | ||
} | ||
/** | ||
* @return {Object} logger | ||
@@ -231,3 +267,5 @@ */ | ||
createTopics, | ||
events, | ||
fetchOffsets, | ||
on, | ||
setOffsets, | ||
@@ -234,0 +272,0 @@ resetOffsets, |
@@ -7,2 +7,3 @@ const Long = require('long') | ||
const InstrumentationEventEmitter = require('../instrumentation/emitter') | ||
const { CONNECT, DISCONNECT, STOP } = require('./instrumentationEvents') | ||
const { KafkaJSNonRetriableError } = require('../errors') | ||
@@ -91,3 +92,6 @@ const { roundRobin } = require('./assigners') | ||
*/ | ||
const connect = async () => await cluster.connect() | ||
const connect = async () => { | ||
await cluster.connect() | ||
instrumentationEmitter.emit(CONNECT) | ||
} | ||
@@ -102,2 +106,3 @@ /** | ||
await cluster.disconnect() | ||
instrumentationEmitter.emit(DISCONNECT) | ||
} catch (e) {} | ||
@@ -115,2 +120,3 @@ } | ||
consumerGroup = null | ||
instrumentationEmitter.emit(STOP) | ||
} | ||
@@ -117,0 +123,0 @@ |
@@ -11,2 +11,5 @@ const InstrumentationEventType = require('../instrumentation/eventType') | ||
END_BATCH_PROCESS: consumerType('end_batch_process'), | ||
CONNECT: consumerType('connect'), | ||
DISCONNECT: consumerType('disconnect'), | ||
STOP: consumerType('stop'), | ||
} |
@@ -8,4 +8,6 @@ const { createLogger, LEVELS: { INFO } } = require('./loggers') | ||
const { assign } = Object | ||
const privateCreateCluster = Symbol('private:Kafka:createCluster') | ||
const PRIVATE = { | ||
CREATE_CLUSTER: Symbol('private:Kafka:createCluster'), | ||
LOGGER: Symbol('private:Kafka:logger'), | ||
} | ||
@@ -25,6 +27,6 @@ module.exports = class Client { | ||
}) { | ||
this.logger = createLogger({ level: logLevel, logCreator }) | ||
this[privateCreateCluster] = (metadataMaxAge = 300000) => | ||
this[PRIVATE.LOGGER] = createLogger({ level: logLevel, logCreator }) | ||
this[PRIVATE.CREATE_CLUSTER] = (metadataMaxAge = 300000) => | ||
new Cluster({ | ||
logger: this.logger, | ||
logger: this[PRIVATE.LOGGER], | ||
brokers, | ||
@@ -46,6 +48,6 @@ ssl, | ||
producer({ createPartitioner, retry, metadataMaxAge } = {}) { | ||
const cluster = this[privateCreateCluster](metadataMaxAge) | ||
const cluster = this[PRIVATE.CREATE_CLUSTER](metadataMaxAge) | ||
return createProducer({ | ||
retry: assign({}, cluster.retry, retry), | ||
logger: this.logger, | ||
retry: { ...cluster.retry, ...retry }, | ||
logger: this[PRIVATE.LOGGER], | ||
cluster, | ||
@@ -73,6 +75,6 @@ createPartitioner, | ||
) { | ||
const cluster = this[privateCreateCluster](metadataMaxAge) | ||
const cluster = this[PRIVATE.CREATE_CLUSTER](metadataMaxAge) | ||
return createConsumer({ | ||
retry: assign({}, cluster.retry, retry), | ||
logger: this.logger, | ||
retry: { ...cluster.retry, retry }, | ||
logger: this[PRIVATE.LOGGER], | ||
cluster, | ||
@@ -94,6 +96,6 @@ groupId, | ||
admin({ retry } = {}) { | ||
const cluster = this[privateCreateCluster]() | ||
const cluster = this[PRIVATE.CREATE_CLUSTER]() | ||
return createAdmin({ | ||
retry: assign({}, cluster.retry, retry), | ||
logger: this.logger, | ||
retry: { ...cluster.retry, retry }, | ||
logger: this[PRIVATE.LOGGER], | ||
cluster, | ||
@@ -107,4 +109,4 @@ }) | ||
logger() { | ||
return this.logger | ||
return this[PRIVATE.LOGGER] | ||
} | ||
} |
@@ -307,39 +307,41 @@ const createRetry = require('../retry') | ||
// Not enough bytes to read the expected response size, keep buffering | ||
if (Buffer.byteLength(this.buffer) <= Decoder.int32Size()) { | ||
return | ||
} | ||
// Process data if there are enough bytes to read the expected response size, | ||
// otherwise keep buffering | ||
while (Buffer.byteLength(this.buffer) > Decoder.int32Size()) { | ||
const data = Buffer.from(this.buffer) | ||
const decoder = new Decoder(data) | ||
const expectedResponseSize = decoder.readInt32() | ||
const data = Buffer.from(this.buffer) | ||
const decoder = new Decoder(data) | ||
const expectedResponseSize = decoder.readInt32() | ||
if (!decoder.canReadBytes(expectedResponseSize)) { | ||
return | ||
} | ||
if (!decoder.canReadBytes(expectedResponseSize)) { | ||
return | ||
} | ||
const response = new Decoder(decoder.readBytes(expectedResponseSize)) | ||
// Reset the buffer as the rest of the bytes | ||
this.buffer = decoder.readAll() | ||
// The full payload is loaded, erase the temporary buffer | ||
this.buffer = Buffer.alloc(0) | ||
if (this.authHandlers) { | ||
const rawResponseSize = Decoder.int32Size() + expectedResponseSize | ||
const rawResponseBuffer = data.slice(0, rawResponseSize) | ||
return this.authHandlers.onSuccess(rawResponseBuffer) | ||
} | ||
if (this.authHandlers) { | ||
return this.authHandlers.onSuccess(data) | ||
} | ||
const correlationId = response.readInt32() | ||
const payload = response.readAll() | ||
const correlationId = decoder.readInt32() | ||
const payload = decoder.readAll() | ||
const entry = this.pendingQueue[correlationId] | ||
delete this.pendingQueue[correlationId] | ||
const entry = this.pendingQueue[correlationId] | ||
delete this.pendingQueue[correlationId] | ||
if (!entry) { | ||
this.logDebug(`Response without match`, { correlationId }) | ||
return | ||
} | ||
if (!entry) { | ||
this.logDebug(`Response without match`, { correlationId }) | ||
return | ||
entry.resolve({ | ||
size: expectedResponseSize, | ||
correlationId, | ||
entry, | ||
payload, | ||
}) | ||
} | ||
entry.resolve({ | ||
size: expectedResponseSize, | ||
correlationId, | ||
entry, | ||
payload, | ||
}) | ||
} | ||
@@ -346,0 +348,0 @@ |
const createRetry = require('../retry') | ||
const createDefaultPartitioner = require('./partitioners/default') | ||
const createSendMessages = require('./sendMessages') | ||
const InstrumentationEventEmitter = require('../instrumentation/emitter') | ||
const events = require('./instrumentationEvents') | ||
const { CONNECT, DISCONNECT } = require('./instrumentationEvents') | ||
const { KafkaJSNonRetriableError } = require('../errors') | ||
const { values, keys } = Object | ||
const eventNames = values(events) | ||
const eventKeys = keys(events) | ||
.map(key => `producer.events.${key}`) | ||
.join(', ') | ||
module.exports = ({ | ||
@@ -14,2 +23,3 @@ cluster, | ||
const retrier = createRetry(Object.assign({}, cluster.retry, retry)) | ||
const instrumentationEmitter = new InstrumentationEventEmitter() | ||
const logger = rootLogger.namespace('Producer') | ||
@@ -112,2 +122,22 @@ const sendMessages = createSendMessages({ logger, cluster, partitioner }) | ||
/** | ||
* @param {string} eventName | ||
* @param {Function} listener | ||
* @return {Function} | ||
*/ | ||
const on = (eventName, listener) => { | ||
if (!eventNames.includes(eventName)) { | ||
throw new KafkaJSNonRetriableError(`Event name should be one of ${eventKeys}`) | ||
} | ||
return instrumentationEmitter.addListener(eventName, event => { | ||
Promise.resolve(listener(event)).catch(e => { | ||
logger.error(`Failed to execute listener: ${e.message}`, { | ||
eventName, | ||
stack: e.stack, | ||
}) | ||
}) | ||
}) | ||
} | ||
/** | ||
* @returns {Object} logger | ||
@@ -121,3 +151,6 @@ */ | ||
*/ | ||
connect: async () => await cluster.connect(), | ||
connect: async () => { | ||
await cluster.connect() | ||
instrumentationEmitter.emit(CONNECT) | ||
}, | ||
@@ -127,4 +160,11 @@ /** | ||
*/ | ||
disconnect: async () => await cluster.disconnect(), | ||
disconnect: async () => { | ||
await cluster.disconnect() | ||
instrumentationEmitter.emit(DISCONNECT) | ||
}, | ||
events, | ||
on, | ||
send, | ||
@@ -131,0 +171,0 @@ |
@@ -93,5 +93,3 @@ const Long = require('long') | ||
readBytes() { | ||
const byteLength = this.readInt32() | ||
readBytes(byteLength = this.readInt32()) { | ||
if (byteLength === -1) { | ||
@@ -98,0 +96,0 @@ return null |
@@ -29,3 +29,9 @@ const Decoder = require('../../../decoder') | ||
const decodeMessages = async decoder => { | ||
const messagesBuffer = decoder.readBytes() | ||
const messagesSize = decoder.readInt32() | ||
if (messagesSize <= 0 || !decoder.canReadBytes(messagesSize)) { | ||
return [] | ||
} | ||
const messagesBuffer = decoder.readBytes(messagesSize) | ||
const magicByte = messagesBuffer.slice(MAGIC_OFFSET).readInt8() | ||
@@ -32,0 +38,0 @@ |
@@ -26,3 +26,3 @@ const Decoder = require('../../../decoder') | ||
const decoder = new Decoder(rawData) | ||
const responses = decoder.readArray(decoder => ({ | ||
const topics = decoder.readArray(decoder => ({ | ||
topicName: decoder.readString(), | ||
@@ -35,3 +35,3 @@ partitions: decoder.readArray(partition), | ||
return { | ||
responses, | ||
topics, | ||
throttleTime, | ||
@@ -42,3 +42,3 @@ } | ||
const parse = async data => { | ||
const partitionsWithError = data.responses.map(response => { | ||
const partitionsWithError = data.topics.map(response => { | ||
return response.partitions.filter(partition => failure(partition.errorCode)) | ||
@@ -45,0 +45,0 @@ }) |
441191
162
8091
1045