Comparing version 0.6.8 to 0.7.0
@@ -8,2 +8,16 @@ # Changelog | ||
## [0.7.0] - 2018-01-19 | ||
### Fixed | ||
- Fix retry on error for message handlers #30 | ||
### Changed | ||
- Use decoder offset for validating response length #21 | ||
- Change log creator to improve interoperability #24 | ||
- Add support to `KAFKAJS_LOG_LEVEL` #24 | ||
- Improved assigner protocol #27 #29 | ||
### Added | ||
- Add seek API to consumer #23 | ||
- Add experimental describe group to consumer #31 | ||
## [0.6.8] - 2017-12-27 | ||
@@ -10,0 +24,0 @@ ### Fixed |
{ | ||
"name": "kafkajs", | ||
"version": "0.6.8", | ||
"version": "0.7.0", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -46,2 +46,3 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", | ||
"lint-staged": "^6.0.0", | ||
"mockdate": "^2.0.2", | ||
"prettier": "^1.7.0" | ||
@@ -48,0 +49,0 @@ }, |
371
README.md
@@ -6,19 +6,52 @@ [](https://travis-ci.org/tulios/kafkajs) | ||
A modern Apache Kafka client for node.js | ||
A modern Apache Kafka client for node.js. This library is compatible with Kafka `0.10+`. | ||
__In active development - early alpha__ | ||
__In active development - alpha__ | ||
- Fully working producer compatible with 0.10.x (0.9.x will be possible soon) | ||
- Fully working consumer groups compatible with 0.10.x (0.9.x will be possible soon) | ||
## Features | ||
- Producer | ||
- Consumer groups | ||
- GZIP compression | ||
- Plain, SSL and SASL_SSL implementations | ||
## Usage | ||
## Table of Contents | ||
### Setting up the Client | ||
- [Installation](#installation) | ||
- [Usage](#usage) | ||
- [Setting up the Client](#setup-client) | ||
- [SSL](#setup-client-ssl) | ||
- [SASL](#setup-client-sasl) | ||
- [Connection timeout](#setup-client-connection-timeout) | ||
- [Default retry](#setup-client-default-retry) | ||
- [Logger](#setup-client-logger) | ||
- [Producing Messages to Kafka](#producing-messages) | ||
- [Custom partitioner](#producing-messages-custom-partitioner) | ||
- [GZIP compression](#producing-messages-gzip-compression) | ||
- [Retry](#producing-messages-retry) | ||
- [Consuming messages from Kafka](#consuming-messages) | ||
- [eachMessage](#consuming-messages-each-message) | ||
- [eachBatch](#consuming-messages-each-batch) | ||
- [Options](#consuming-messages-options) | ||
- [Custom assigner](#consuming-messages-custom-assigner) | ||
- [Instrumentation](#instrumentation) | ||
- [Development](#development) | ||
## <a name="installation"></a> Installation | ||
```sh | ||
npm install kafkajs | ||
# yarn add kafkajs | ||
``` | ||
## <a name="usage"></a> Usage | ||
### <a name="setup-client"></a> Setting up the Client | ||
The client must be configured with at least one broker. The brokers on the list are considered seed brokers and are only used to bootstrap the client and load initial metadata. | ||
```javascript | ||
const { Kafka } = require('kafkajs') | ||
// Create the client with broker list | ||
// Create the client with the broker list | ||
const kafka = new Kafka({ | ||
@@ -30,5 +63,191 @@ clientId: 'my-app', | ||
### Producing Messages to Kafka | ||
#### <a name="setup-client-ssl"></a> SSL | ||
The `ssl` option can be used to configure the TLS sockets. The options are passed directly to [`tls.connect`](https://nodejs.org/api/tls.html#tls_tls_connect_options_callback) and used to create the TLS Secure Context, all options are accepted. | ||
```javascript | ||
const fs = require('fs') | ||
new Kafka({ | ||
clientId: 'my-app', | ||
brokers: ['kafka1:9092', 'kafka2:9092'], | ||
ssl: { | ||
rejectUnauthorized: false, | ||
ca: [fs.readFileSync('/my/custom/ca.crt', 'utf-8')], | ||
key: fs.readFileSync('/my/custom/client-key.pem', 'utf-8'), | ||
cert: fs.readFileSync('/my/custom/client-cert.pem', 'utf-8') | ||
}, | ||
}) | ||
``` | ||
Take a look at [TLS create secure context](https://nodejs.org/dist/latest-v8.x/docs/api/tls.html#tls_tls_createsecurecontext_options) for more information. | ||
#### <a name="setup-client-sasl"></a> SASL | ||
Kafka has support for using SASL to authenticate clients. The `sasl` option can be used to configure the authentication mechanism. Currently, KafkaJS only supports the `PLAIN` mechanism. | ||
```javascript | ||
new Kafka({ | ||
clientId: 'my-app', | ||
brokers: ['kafka1:9092', 'kafka2:9092'], | ||
sasl: { | ||
mechanism: 'plain', | ||
username: 'my-username', | ||
password: 'my-password' | ||
}, | ||
}) | ||
``` | ||
It is __highly recommended__ that you use SSL for encryption when using `PLAIN`. | ||
#### <a name="setup-client-connection-timeout"></a> Connection Timeout | ||
Time in milliseconds to wait for a successful connection. The default value is: `1000`. | ||
```javascript | ||
new Kafka({ | ||
clientId: 'my-app', | ||
brokers: ['kafka1:9092', 'kafka2:9092'], | ||
connectionTimeout: 3000 | ||
}) | ||
``` | ||
#### <a name="setup-client-default-retry"></a> Default Retry | ||
The `retry` option can be used to set the default configuration. The retry mechanism uses a randomization function that grows exponentially. The configuration will be used to retry connections and API calls to Kafka (when using producers or consumers). | ||
If the max number of retries is exceeded the retrier will throw `KafkaJSNumberOfRetriesExceeded` and interrupt. Producers will bubble up the error to the user code; Consumers will wait the retry time attached to the exception (it will be based on the number of attempts) and perform a full restart. | ||
Available options: | ||
* __maxRetryTime__ - Maximum wait time for a retry in milliseconds. Default: `30000` | ||
* __initialRetryTime__ - Initial value used to calculate the retry in milliseconds (This is still randomized following the randomization factor). Default: `300` | ||
* __factor__ - Randomization factor. Default: `0.2` | ||
* __multiplier__ - Exponential factor. Default: `2` | ||
* __retries__ - Max number of retries per call. Default: `5` | ||
```javascript | ||
new Kafka({ | ||
clientId: 'my-app', | ||
brokers: ['kafka1:9092', 'kafka2:9092'], | ||
retry: { | ||
initialRetryTime: 100, | ||
retries: 8 | ||
} | ||
}) | ||
``` | ||
#### <a name="setup-client-logger"></a> Logger | ||
KafkaJS has a built-in `STDOUT` logger which outputs JSON. It also accepts a custom log creator which allows you to integrate your favorite logger library. | ||
There are 5 log levels available: `NOTHING`, `ERROR`, `WARN`, `INFO`, and `DEBUG`. `INFO` is configured by default. | ||
__How to configure the log level?__ | ||
```javascript | ||
const { Kafka, logLevel } = require('kafkajs') | ||
// Create the client with the broker list | ||
const kafka = new Kafka({ | ||
clientId: 'my-app', | ||
brokers: ['kafka1:9092', 'kafka2:9092'], | ||
logLevel: logLevel.ERROR | ||
}) | ||
``` | ||
The environment variable `KAFKAJS_LOG_LEVEL` can also be used and it has precedence over the configuration in code, example: | ||
```sh | ||
KAFKAJS_LOG_LEVEL=info node code.js | ||
``` | ||
__How to create a log creator?__ | ||
A log creator is a function which receives a log level and returns a log function. The log function receives: namespace, level, label, and log. | ||
`namespace` identifies the component which is performing the log, for example, connection or consumer. | ||
`level` is the log level of the log entry. | ||
`label` is a text representation of the log level, example: 'INFO'. | ||
`log` is an object with the following keys: `timestamp`, `logger`, `message`, and the extra keys given by the user. (`logger.info('test', { extra_data: true })`) | ||
```javascript | ||
{ | ||
level: 4, | ||
label: 'INFO', // NOTHING, ERROR, WARN, INFO, or DEBUG | ||
timestamp: '2017-12-29T13:39:54.575Z', | ||
logger: 'kafkajs', | ||
message: 'Started', | ||
// ... any other extra key provided to the log function | ||
} | ||
``` | ||
The general structure looks like this: | ||
```javascript | ||
const MyLogCreator = logLevel => ({ namespace, level, label, log }) => { | ||
// Example: | ||
// const { timestamp, logger, message, ...others } = log | ||
// console.log(`${label} [${namespace}] ${message} ${JSON.stringify(others)}`) | ||
} | ||
``` | ||
Example using [winston](https://github.com/winstonjs/winston): | ||
```javascript | ||
const { logLevel } = require('kafkajs') | ||
const winston = require('winston') | ||
const toWinstonLogLevel = level => switch(level) { | ||
case logLevel.ERROR: | ||
case logLevel.NOTHING: | ||
return 'error' | ||
case logLevel.WARN: | ||
return 'warn' | ||
case logLevel.INFO: | ||
return 'info' | ||
case logLevel.DEBUG: | ||
return 'debug' | ||
} | ||
const WinstonLogCreator = logLevel => { | ||
const logger = winston.createLogger({ | ||
level: toWinstonLogLevel(logLevel), | ||
transports: [ | ||
new winston.transports.Console(), | ||
new winston.transports.File({ filename: 'myapp.log' }) | ||
] | ||
}) | ||
return ({ namespace, level, { message, ...extra } }) => { | ||
logger.log({ | ||
level: toWinstonLogLevel(level), | ||
message, | ||
extra, | ||
}) | ||
} | ||
} | ||
``` | ||
Once you have your log creator you can use the `logCreator` option to configure the client: | ||
```javascript | ||
const kafka = new Kafka({ | ||
clientId: 'my-app', | ||
brokers: ['kafka1:9092', 'kafka2:9092'], | ||
logLevel: logLevel.ERROR, | ||
logCreator: WinstonLogCreator | ||
}) | ||
``` | ||
### <a name="producing-messages"></a> Producing Messages to Kafka | ||
To publish messages to Kafka you have to create a producer. By default the producer is configured to distribute the messages with the following logic: | ||
- If a partition is specified in the message, use it | ||
- If no partition is specified but a key is present choose a partition based on a hash (murmur2) of the key | ||
- If no partition or key is present choose a partition in a round-robin fashion | ||
```javascript | ||
const producer = kafka.producer() | ||
@@ -51,7 +270,73 @@ | ||
### Consuming messages with consumer groups | ||
Example with a defined partition: | ||
```javascript | ||
// ...require and connect... | ||
await producer.send({ | ||
topic: 'topic-name', | ||
messages: [ | ||
{ key: 'key1', value: 'hello world', partition: 0 }, | ||
{ key: 'key2', value: 'hey hey!', partition: 1 } | ||
], | ||
}) | ||
``` | ||
#### <a name="producing-messages-gzip-compression"></a> GZIP compression | ||
TODO: write | ||
#### <a name="producing-messages-custom-partitioner"></a> Custom partitioner | ||
It's possible to assign a custom partitioner to the consumer. A partitioner is a function which returns another function responsible for the partition selection, something like this: | ||
```javascript | ||
const MyPartitioner = () => { | ||
// some initialization | ||
return ({ topic, partitionMetadata, message }) => { | ||
// select a partition based on some logic | ||
// return the partition number | ||
return 0 | ||
} | ||
} | ||
``` | ||
`partitionMetadata` is an array of partitions with the following structure: | ||
`{ partitionId: <NodeId>, leader: <NodeId> }` | ||
Example: | ||
```javascript | ||
[ | ||
{ partitionId: 1, leader: 1 }, | ||
{ partitionId: 2, leader: 2 }, | ||
{ partitionId: 0, leader: 0 } | ||
] | ||
``` | ||
To Configure your partitioner use the option `createPartitioner`. | ||
```javascript | ||
kafka.producer({ createPartitioner: MyPartitioner }) | ||
``` | ||
#### <a name="producing-messages-retry"></a> Retry | ||
The option `retry` can be used to customize the configuration for the producer. | ||
Take a look at [Retry](#setup-client-default-retry) for more information. | ||
### <a name="consuming-messages"></a> Consuming messages from Kafka | ||
Consumer groups allow a group of machines or processes to coordinate access to a list of topics, distributing the load among the consumers. When a consumer fails the load is automatically distributed to other members of the group. Consumer groups must have unique group ids. | ||
Creating the consumer: | ||
```javascript | ||
const consumer = kafka.consumer({ groupId: 'my-group' }) | ||
``` | ||
Subscribing to some topics: | ||
```javascript | ||
async () => { | ||
@@ -65,3 +350,21 @@ await consumer.connect() | ||
// await consumer.subscribe({ topic: 'topic-name', fromBeginning: true }) | ||
} | ||
``` | ||
KafkaJS offers you two ways to process your data: `eachMessage` and `eachBatch` | ||
#### <a name="consuming-messages-each-message"></a> eachMessage | ||
This handler provides a convenient API, feeding your function one message at a time. The handler will automatically commit your offsets and heartbeat at the configured interval. If you are just looking to get started with Kafka consumers this should be your first solution. | ||
```javascript | ||
async () => { | ||
await consumer.connect() | ||
// Subscribe can be called several times | ||
await consumer.subscribe({ topic: 'topic-name' }) | ||
// It's possible to start from the beginning: | ||
// await consumer.subscribe({ topic: 'topic-name', fromBeginning: true }) | ||
await consumer.run({ | ||
@@ -81,4 +384,6 @@ eachMessage: async ({ topic, partition, message }) => { | ||
it's also possible to consume the batch instead of each message, example: | ||
#### <a name="consuming-messages-each-batch"></a> eachBatch | ||
Some use cases can be optimized by dealing with batches rather than single messages. This handler will feed your function batches and some utility functions to give your code more flexibility. Be aware that using `eachBatch` is considered a more advanced use case since you will have to understand how session timeouts and heartbeats are connected. All resolved offsets will be automatically committed after the function is executed. | ||
```javascript | ||
@@ -101,3 +406,4 @@ // create consumer, connect and subscribe ... | ||
resolveOffset(message.offset) | ||
await resolveOffset(message.offset) | ||
await heartbeat() | ||
} | ||
@@ -108,26 +414,28 @@ }, | ||
// remember to close your consumer when you leave | ||
await consumer.disconnect() | ||
``` | ||
### Configure SSL and SASL | ||
* `highWatermark` is the last committed offset within the topic partition. It can be useful for calculating lag. | ||
```javascript | ||
const fs = require('fs') | ||
const kafka = new Kafka({ | ||
clientId: 'my-app', | ||
brokers: ['kafka1:9092', 'kafka2:9092'] | ||
ssl: { | ||
cert: fs.readFileSync('<path/to>/client_cert.pem', 'utf-8'), | ||
key: fs.readFileSync('<path/to>/client_key.pem', 'utf-8'), | ||
ca: [fs.readFileSync('<path/to>/ca_cert.pem', 'utf-8')], | ||
}, | ||
sasl: { | ||
mechanism: 'plain', | ||
username: 'my-username', | ||
password: 'my-password', | ||
}, | ||
}) | ||
``` | ||
#### <a name="consuming-messages-options"></a> Options | ||
## Development | ||
- __createPartitionAssigner__ - default: `round robin` | ||
- __sessionTimeout__ - Timeout in milliseconds used to detect failures. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. default: `30000` | ||
- __heartbeatInterval__ - The expected time in milliseconds between heartbeats to the consumer coordinator. Heartbeats are used to ensure that the consumer's session stays active. The value must be set lower than session timeout. default: `3000` | ||
- __maxBytesPerPartition__ - The maximum amount of data per-partition the server will return. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. default: `1048576` (1MB) | ||
- __minBytes__ - Minimum amount of data the server should return for a fetch request, otherwise wait up to `maxWaitTimeInMs` for more data to accumulate. default: `1` | ||
- __maxBytes__ - Maximum amount of bytes to accumulate in the response. Supported by Kafka >= `0.10.1.0`. default: `10485760` (10MB) | ||
- __maxWaitTimeInMs__ - The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by `minBytes`. default: `5000`, | ||
- __retry__ - default: `{ retries: 10 }` | ||
#### <a name="consuming-messages-custom-assigner"></a> Custom assigner | ||
TODO: write | ||
## <a name="instrumentation"></a> Instrumentation | ||
TODO: write | ||
## <a name="development"></a> Development | ||
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol | ||
@@ -146,2 +454,5 @@ http://kafka.apache.org/protocol.html | ||
yarn test:local | ||
# To run with logs | ||
# KAFKAJS_LOG_LEVEL=debug yarn test:local | ||
``` | ||
@@ -148,0 +459,0 @@ |
@@ -204,4 +204,4 @@ const { Types: Compression } = require('../protocol/message/compression') | ||
* @param {string} [protocolType="consumer"] Unique name for class of protocols implemented by group | ||
* @param {Array} [groupProtocols=[{ name: 'default' }]] List of protocols that the member supports | ||
* [{ name: 'default', metadata: null }] | ||
* @param {Array} groupProtocols List of protocols that the member supports (assignment strategy) | ||
* [{ name: 'AssignerName', metadata: '{"version": 1, "topics": []}' }] | ||
* @returns {Promise} | ||
@@ -214,3 +214,3 @@ */ | ||
protocolType = 'consumer', | ||
groupProtocols = [{ name: 'default' }], | ||
groupProtocols, | ||
}) { | ||
@@ -329,2 +329,12 @@ // TODO: validate groupId and sessionTimeout (maybe default for sessionTimeout) | ||
} | ||
/** | ||
* @public | ||
* @param {Array} groupIds | ||
* @returns {Promise} | ||
*/ | ||
async describeGroups({ groupIds }) { | ||
const describeGroups = this.lookupRequest(apiKeys.DescribeGroups, requests.DescribeGroups) | ||
return await this.connection.send(describeGroups({ groupIds })) | ||
} | ||
} |
@@ -6,3 +6,6 @@ /** | ||
*/ | ||
module.exports = ({ cluster }) => { | ||
module.exports = ({ cluster }) => ({ | ||
name: 'RoundRobinAssigner', | ||
version: 1, | ||
/** | ||
@@ -31,3 +34,3 @@ * This process can result in imbalanced assignments | ||
*/ | ||
return ({ members, topics }) => { | ||
assign({ members, topics }) { | ||
const membersCount = members.length | ||
@@ -57,3 +60,10 @@ const sortedMembers = members.map(({ memberId }) => memberId).sort() | ||
})) | ||
} | ||
} | ||
}, | ||
protocol({ topics }) { | ||
return { | ||
name: this.name, | ||
metadata: JSON.stringify({ version: this.version, topics }), | ||
} | ||
}, | ||
}) |
const flatten = require('../utils/flatten') | ||
const OffsetManager = require('./offsetManager') | ||
const Batch = require('./batch') | ||
const SeekOffsets = require('./seekOffsets') | ||
const { KafkaJSError } = require('../errors') | ||
@@ -43,2 +44,3 @@ const { HEARTBEAT } = require('./instrumentationEvents') | ||
this.seekOffset = new SeekOffsets() | ||
this.coordinator = null | ||
@@ -69,2 +71,10 @@ this.generationId = null | ||
memberId: this.memberId || '', | ||
// Keep the default procotol in the list to enable consumers running an old | ||
// version of Kafkajs to smoothly transition to the new protocol. The changes | ||
// in the protocol format and name happened on PR #27 | ||
groupProtocols: [ | ||
this.assigner.protocol({ topics: this.topics }), | ||
{ name: 'default', metadata: Buffer.from([0, 0]) }, | ||
], | ||
}) | ||
@@ -91,3 +101,3 @@ | ||
this.logger.debug('Chosen as group leader', { groupId, generationId, memberId, topics }) | ||
assignment = this.assigner({ members, topics }) | ||
assignment = this.assigner.assign({ members, topics }) | ||
this.logger.debug('Group assignment', { groupId, generationId, topics, assignment }) | ||
@@ -125,2 +135,15 @@ } | ||
/** | ||
* Update the consumer offset for the given topic/partition. This will be used | ||
* on the next fetch. If this API is invoked for the same topic/partition more | ||
* than once, the latest offset will be used on the next fetch. | ||
* | ||
* @param {string} topic | ||
* @param {number} partition | ||
* @param {string} offset | ||
*/ | ||
seek({ topic, partition, offset }) { | ||
this.seekOffset.set(topic, partition, offset) | ||
} | ||
async commitOffsets() { | ||
@@ -150,2 +173,13 @@ await this.offsetManager.commitOffsets() | ||
const requestsPerLeader = {} | ||
while (this.seekOffset.size > 0) { | ||
const seekEntry = this.seekOffset.pop() | ||
this.logger.debug('Seek offset', { | ||
groupId: this.groupId, | ||
memberId: this.memberId, | ||
seek: seekEntry, | ||
}) | ||
await this.offsetManager.seek(seekEntry) | ||
} | ||
await this.offsetManager.resolveOffsets() | ||
@@ -205,13 +239,3 @@ | ||
if (e.name === 'KafkaJSOffsetOutOfRange') { | ||
this.logger.error('Offset out of range, resetting to default offset', { | ||
topic: e.topic, | ||
groupId: this.groupId, | ||
memberId: this.memberId, | ||
partition: e.partition, | ||
}) | ||
await this.offsetManager.setDefaultOffset({ | ||
topic: e.topic, | ||
partition: e.partition, | ||
}) | ||
await this.recoverFromOffsetOutOfRange(e) | ||
} | ||
@@ -226,2 +250,16 @@ | ||
} | ||
async recoverFromOffsetOutOfRange(e) { | ||
this.logger.error('Offset out of range, resetting to default offset', { | ||
topic: e.topic, | ||
partition: e.partition, | ||
groupId: this.groupId, | ||
memberId: this.memberId, | ||
}) | ||
await this.offsetManager.setDefaultOffset({ | ||
topic: e.topic, | ||
partition: e.partition, | ||
}) | ||
} | ||
} |
@@ -0,1 +1,3 @@ | ||
const Long = require('long') | ||
const createRetry = require('../retry') | ||
const createRoundRobinAssigned = require('./assigners/roundRobinAssigner') | ||
@@ -6,6 +8,8 @@ const ConsumerGroup = require('./consumerGroup') | ||
const InstrumentationEventEmitter = require('../instrumentation/emitter') | ||
const { KafkaJSError } = require('../errors') | ||
const { KafkaJSNonRetriableError } = require('../errors') | ||
const eventNames = Object.values(events) | ||
const eventKeys = Object.keys(events) | ||
const { keys, values } = Object | ||
const eventNames = values(events) | ||
const eventKeys = keys(events) | ||
.map(key => `consumer.events.${key}`) | ||
@@ -32,9 +36,11 @@ .join(', ') | ||
const logger = rootLogger.namespace('Consumer') | ||
const topics = {} | ||
let runner = null | ||
let consumerGroup = null | ||
const createRunner = ({ eachBatch, eachMessage, onCrash }) => { | ||
const consumerGroup = new ConsumerGroup({ | ||
const createConsumerGroup = () => { | ||
return new ConsumerGroup({ | ||
logger: rootLogger, | ||
topics: Object.keys(topics), | ||
topics: keys(topics), | ||
topicConfigurations: topics, | ||
@@ -51,3 +57,5 @@ cluster, | ||
}) | ||
} | ||
const createRunner = ({ eachBatch, eachMessage, onCrash }) => { | ||
return new Runner({ | ||
@@ -99,2 +107,4 @@ logger: rootLogger, | ||
const run = async ({ eachBatch = null, eachMessage = null } = {}) => { | ||
consumerGroup = createConsumerGroup() | ||
const start = async onCrash => { | ||
@@ -106,2 +116,7 @@ logger.info('Starting', { groupId }) | ||
const restart = onCrash => { | ||
consumerGroup = createConsumerGroup() | ||
start(onCrash) | ||
} | ||
const onCrash = async e => { | ||
@@ -116,3 +131,3 @@ logger.error(`Crash: ${e.name}: ${e.message}`, { retryCount: e.retryCount, groupId }) | ||
}) | ||
setTimeout(() => start(onCrash), e.retryTime) | ||
setTimeout(() => restart(onCrash), e.retryTime) | ||
} | ||
@@ -131,5 +146,3 @@ } | ||
if (!eventNames.includes(eventName)) { | ||
throw new KafkaJSError(`Event name should be one of ${eventKeys}`, { | ||
retriable: false, | ||
}) | ||
throw new KafkaJSNonRetriableError(`Event name should be one of ${eventKeys}`) | ||
} | ||
@@ -140,2 +153,58 @@ | ||
/** | ||
* @param {string} topic | ||
* @param {number} partition | ||
* @param {string} offset | ||
*/ | ||
const seek = ({ topic, partition, offset }) => { | ||
if (!topic) { | ||
throw new KafkaJSNonRetriableError(`Invalid topic ${topic}`) | ||
} | ||
if (isNaN(partition)) { | ||
throw new KafkaJSNonRetriableError( | ||
`Invalid partition, expected a number received ${partition}` | ||
) | ||
} | ||
try { | ||
if (Long.fromValue(offset).lessThan(0)) { | ||
throw new KafkaJSNonRetriableError('Offset must not be a negative number') | ||
} | ||
} catch (_) { | ||
throw new KafkaJSNonRetriableError(`Invalid offset, expected a long received ${offset}`) | ||
} | ||
if (!consumerGroup) { | ||
throw new KafkaJSNonRetriableError( | ||
'Consumer group was not initialized, consumer#run must be called first' | ||
) | ||
} | ||
consumerGroup.seek({ topic, partition, offset }) | ||
} | ||
/** | ||
* @returns Promise<GroupDescription> | ||
* | ||
* @typedef {Object} GroupDescription | ||
* @property {string} groupId | ||
* @property {Array<MemberDescription>} members | ||
* @property {string} protocol | ||
* @property {string} protocolType | ||
* @property {string} state | ||
* | ||
* @typedef {Object} MemberDescription | ||
* @property {string} clientHost | ||
* @property {string} clientId | ||
* @property {string} memberId | ||
* @property {Buffer} memberAssignment | ||
* @property {Buffer} memberMetadata | ||
*/ | ||
const describeGroup = async () => { | ||
const coordinator = await cluster.findGroupCoordinator({ groupId }) | ||
const retrier = createRetry(retry) | ||
return retrier(async () => { | ||
const { groups } = await coordinator.describeGroups({ groupIds: [groupId] }) | ||
return groups.find(group => group.groupId === groupId) | ||
}) | ||
} | ||
return { | ||
@@ -146,2 +215,4 @@ connect, | ||
run, | ||
seek, | ||
describeGroup, | ||
on, | ||
@@ -148,0 +219,0 @@ events, |
@@ -22,3 +22,10 @@ const Long = require('long') | ||
this.coordinator = coordinator | ||
// memberAssignment format: | ||
// { | ||
// 'topic1': [0, 1, 2, 3], | ||
// 'topic2': [0, 1, 2, 3, 4, 5], | ||
// } | ||
this.memberAssignment = memberAssignment | ||
this.topicConfigurations = topicConfigurations | ||
@@ -31,20 +38,9 @@ this.instrumentationEmitter = instrumentationEmitter | ||
this.topics = keys(memberAssignment) | ||
this.clearOffsets() | ||
this.clearAllOffsets() | ||
} | ||
clearOffsets() { | ||
this.committedOffsets = indexTopics(this.topics) | ||
this.resolvedOffsets = indexTopics(this.topics) | ||
} | ||
/** | ||
* @param {string} topic | ||
* @param {number} partition | ||
* @returns {object} offsets by topic and partition | ||
* { | ||
* 'topic-name': { | ||
* 0: '-1', | ||
* 1: '10' | ||
* } | ||
* } | ||
* @returns {string} | ||
*/ | ||
@@ -61,10 +57,8 @@ nextOffset(topic, partition) { | ||
const nextOffset = Long.fromValue(offset) | ||
.add(1) | ||
.toString() | ||
this.resolvedOffsets[topic][partition] = nextOffset | ||
return offset | ||
return Long.fromValue(offset) | ||
} | ||
/** | ||
* @returns {Broker} | ||
*/ | ||
async getCoordinator() { | ||
@@ -118,5 +112,36 @@ if (!this.coordinator.isConnected()) { | ||
this.clearOffsets() | ||
this.clearOffsets({ topic, partition }) | ||
} | ||
/** | ||
* Commit the given offset to the topic/partition. If the consumer isn't assigned to the given | ||
* topic/partition this method will be a NO-OP. | ||
* | ||
* @param {string} topic | ||
* @param {number} partition | ||
* @param {string} offset | ||
*/ | ||
async seek({ topic, partition, offset }) { | ||
if (!this.memberAssignment[topic] || !this.memberAssignment[topic].includes(partition)) { | ||
return | ||
} | ||
const { groupId, generationId, memberId } = this | ||
const coordinator = await this.getCoordinator() | ||
await coordinator.offsetCommit({ | ||
groupId, | ||
memberId, | ||
groupGenerationId: generationId, | ||
topics: [ | ||
{ | ||
topic, | ||
partitions: [{ partition, offset }], | ||
}, | ||
], | ||
}) | ||
this.clearOffsets({ topic, partition }) | ||
} | ||
async commitOffsets() { | ||
@@ -227,2 +252,20 @@ const { groupId, generationId, memberId } = this | ||
} | ||
/** | ||
* @private | ||
* @param {string} topic | ||
* @param {number} partition | ||
*/ | ||
clearOffsets({ topic, partition }) { | ||
delete this.committedOffsets[topic][partition] | ||
delete this.resolvedOffsets[topic][partition] | ||
} | ||
/** | ||
* @private | ||
*/ | ||
clearAllOffsets() { | ||
this.committedOffsets = indexTopics(this.topics) | ||
this.resolvedOffsets = indexTopics(this.topics) | ||
} | ||
} |
@@ -114,3 +114,8 @@ const createRetry = require('../retry') | ||
if (!isKafkaJSError(e)) { | ||
this.logger.error(`Error when calling eachMessage`, { stack: e.stack }) | ||
this.logger.error(`Error when calling eachMessage`, { | ||
topic, | ||
partition, | ||
offset: message.offset, | ||
stack: e.stack, | ||
}) | ||
} | ||
@@ -144,3 +149,8 @@ | ||
if (!isKafkaJSError(e)) { | ||
this.logger.error(`Error when calling eachBatch`, { stack: e.stack }) | ||
this.logger.error(`Error when calling eachBatch`, { | ||
topic, | ||
partition, | ||
offset: batch.firstOffset(), | ||
stack: e.stack, | ||
}) | ||
} | ||
@@ -147,0 +157,0 @@ |
@@ -56,2 +56,3 @@ class KafkaJSError extends Error { | ||
KafkaJSError, | ||
KafkaJSNonRetriableError, | ||
KafkaJSPartialMessageError, | ||
@@ -58,0 +59,0 @@ KafkaJSBrokerNotFound, |
const { createLogger, LEVELS: { INFO } } = require('./loggers') | ||
const logFunctionConsole = require('./loggers/console') | ||
const LoggerConsole = require('./loggers/console') | ||
const Cluster = require('./cluster') | ||
@@ -7,2 +7,4 @@ const createProducer = require('./producer') | ||
const { assign } = Object | ||
module.exports = class Client { | ||
@@ -17,5 +19,5 @@ constructor({ | ||
logLevel = INFO, | ||
logFunction = logFunctionConsole, | ||
logCreator = LoggerConsole, | ||
}) { | ||
this.logger = createLogger({ level: logLevel, logFunction }) | ||
this.logger = createLogger({ level: logLevel, logCreator }) | ||
this.createCluster = () => | ||
@@ -37,7 +39,8 @@ new Cluster({ | ||
producer({ createPartitioner, retry } = {}) { | ||
const cluster = this.createCluster() | ||
return createProducer({ | ||
cluster: this.createCluster(), | ||
retry: assign({}, cluster.retry, retry), | ||
logger: this.logger, | ||
cluster, | ||
createPartitioner, | ||
retry, | ||
}) | ||
@@ -62,5 +65,7 @@ } | ||
) { | ||
const cluster = this.createCluster() | ||
return createConsumer({ | ||
cluster: this.createCluster(), | ||
retry: assign({}, cluster.retry, retry), | ||
logger: this.logger, | ||
cluster, | ||
groupId, | ||
@@ -74,5 +79,4 @@ createPartitionAssigner, | ||
maxWaitTimeInMs, | ||
retry, | ||
}) | ||
} | ||
} |
@@ -1,15 +0,21 @@ | ||
module.exports = (namespace, log) => { | ||
const label = namespace ? `[${namespace}] ` : '' | ||
const message = JSON.stringify(Object.assign(log, { message: `${label}${log.message}` })) | ||
const { LEVELS: logLevel } = require('./index') | ||
switch (log.level) { | ||
case 'INFO': | ||
module.exports = () => ({ namespace, level, label, log }) => { | ||
const prefix = namespace ? `[${namespace}] ` : '' | ||
const message = JSON.stringify( | ||
Object.assign({ level: label }, log, { | ||
message: `${prefix}${log.message}`, | ||
}) | ||
) | ||
switch (level) { | ||
case logLevel.INFO: | ||
return console.info(message) | ||
case 'ERROR': | ||
case logLevel.ERROR: | ||
return console.error(message) | ||
case 'WARN': | ||
case logLevel.WARN: | ||
return console.warn(message) | ||
case 'DEBUG': | ||
case logLevel.DEBUG: | ||
return console.log(message) | ||
} | ||
} |
@@ -0,1 +1,3 @@ | ||
const { assign } = Object | ||
const LEVELS = { | ||
@@ -9,3 +11,3 @@ NOTHING: 0, | ||
const createLevel = (label, level, currentLevel, namespace, loggerFunction) => ( | ||
const createLevel = (label, level, currentLevel, namespace, logFunction) => ( | ||
message, | ||
@@ -15,7 +17,8 @@ extra = {} | ||
if (level > currentLevel) return | ||
loggerFunction( | ||
logFunction({ | ||
namespace, | ||
Object.assign( | ||
level, | ||
label, | ||
log: assign( | ||
{ | ||
level: label, | ||
timestamp: new Date().toISOString(), | ||
@@ -26,8 +29,11 @@ logger: 'kafkajs', | ||
extra | ||
) | ||
) | ||
), | ||
}) | ||
} | ||
const createLogger = ({ level = LEVELS.INFO, logFunction = null } = {}) => { | ||
const logLevel = parseInt(process.env.LOG_LEVEL, 10) || level | ||
const createLogger = ({ level = LEVELS.INFO, logCreator = null } = {}) => { | ||
const envLogLevel = (process.env.KAFKAJS_LOG_LEVEL || '').toUpperCase() | ||
const logLevel = LEVELS[envLogLevel] || level | ||
const logFunction = logCreator(logLevel) | ||
const createLogFunctions = namespace => ({ | ||
@@ -40,3 +46,3 @@ info: createLevel('INFO', LEVELS.INFO, logLevel, namespace, logFunction), | ||
return Object.assign(createLogFunctions(), { | ||
return assign(createLogFunctions(), { | ||
namespace: namespace => createLogFunctions(namespace), | ||
@@ -43,0 +49,0 @@ }) |
@@ -309,5 +309,4 @@ const createRetry = require('../retry') | ||
const expectedResponseSize = decoder.readInt32() | ||
const receivedResponseSize = Buffer.byteLength(this.buffer) - Decoder.int32Size() | ||
if (receivedResponseSize < expectedResponseSize) { | ||
if (!decoder.canReadBytes(expectedResponseSize)) { | ||
return | ||
@@ -314,0 +313,0 @@ } |
@@ -65,2 +65,6 @@ const Long = require('long') | ||
canReadBytes(length) { | ||
return Buffer.byteLength(this.buffer) - this.offset >= length | ||
} | ||
readBytes() { | ||
@@ -67,0 +71,0 @@ const byteLength = this.readInt32() |
@@ -17,3 +17,3 @@ const requests = { | ||
SyncGroup: require('./syncGroup'), | ||
DescribeGroups: {}, | ||
DescribeGroups: require('./describeGroups'), | ||
ListGroups: {}, | ||
@@ -20,0 +20,0 @@ SaslHandshake: require('./saslHandshake'), |
@@ -1,1 +0,1 @@ | ||
{"type":"Buffer","data":[0,10,116,101,115,116,45,103,114,111,117,112,0,0,117,48,0,0,0,8,99,111,110,115,117,109,101,114,0,0,0,1,0,7,100,101,102,97,117,108,116,0,0,0,2,0,0]} | ||
{"data":[0,10,116,101,115,116,45,103,114,111,117,112,0,0,117,48,0,0,0,8,99,111,110,115,117,109,101,114,0,0,0,1,0,7,100,101,102,97,117,108,116,0,0,0,0],"type":"Buffer"} |
@@ -29,5 +29,4 @@ const Encoder = require('../../../encoder') | ||
const encodeGroupProtocols = ({ name, metadata = {} }) => { | ||
const protocolMetadata = new Encoder().writeInt16(metadata.version || 0) | ||
return new Encoder().writeString(name).writeBytes(protocolMetadata.buffer) | ||
const encodeGroupProtocols = ({ name, metadata = '' }) => { | ||
return new Encoder().writeString(name).writeBytes(metadata) | ||
} |
Sorry, the diff of this file is not supported yet
329122
170
5724
454
16