New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafkajs

Package Overview
Dependencies
Maintainers
2
Versions
299
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafkajs - npm Package Compare versions

Comparing version 1.0.1 to 1.1.0

docker-compose.0_11.yml

5

CHANGELOG.md

@@ -8,2 +8,7 @@ # Changelog

## [1.1.0] - 2018-06-14
### Added
- Support to SASL SCRAM (`scram-sha-256` and `scram-sha-512`) #72
- Admin client with support to create topics #73
## [1.0.1] - 2018-05-18

@@ -10,0 +15,0 @@ ### Fixed

6

package.json
{
"name": "kafkajs",
"version": "1.0.1",
"version": "1.1.0",
"description": "A modern Apache Kafka client for node.js",

@@ -9,3 +9,5 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>",

"keywords": [
"kafka"
"kafka",
"sasl",
"scram"
],

@@ -12,0 +14,0 @@ "engines": {

@@ -18,2 +18,3 @@ [![KafkaJS](https://raw.githubusercontent.com/tulios/kafkajs/master/logo.png)](https://github.com/tulios/kafkajs)

- Plain, SSL and SASL_SSL implementations
- Support for SCRAM-SHA-256 and SCRAM-SHA-512

@@ -23,24 +24,29 @@ ## Table of Contents

- [Installation](#installation)
- [Usage](#usage)
- [Setting up the Client](#setup-client)
- [SSL](#setup-client-ssl)
- [SASL](#setup-client-sasl)
- [Connection timeout](#setup-client-connection-timeout)
- [Default retry](#setup-client-default-retry)
- [Logger](#setup-client-logger)
- [Producing Messages to Kafka](#producing-messages)
- [Custom partitioner](#producing-messages-custom-partitioner)
- [Compression](#producing-messages-gzip-compression)
- [Retry](#producing-messages-retry)
- [Consuming messages from Kafka](#consuming-messages)
- [eachMessage](#consuming-messages-each-message)
- [eachBatch](#consuming-messages-each-batch)
- [Options](#consuming-messages-options)
- [Pause, Resume, & Seek](#consuming-messages-pause-resume)
- [Seek](#consuming-messages-seek)
- [Custom partition assigner](#consuming-messages-custom-partition-assigner)
- [Describe group](#consuming-messages-describe-group)
- [Instrumentation](#instrumentation)
- [Custom logger](#custom-logger)
- [Development](#development)
- [Configuration](#configuration)
- [SSL](#configuration-ssl)
- [SASL](#configuration-sasl)
- [Connection timeout](#configuration-connection-timeout)
- [Default retry](#configuration-default-retry)
- [Logging](#configuration-logging)
- [Producing Messages](#producing-messages)
- [Custom partitioner](#producing-messages-custom-partitioner)
- [Retry](#producing-messages-retry)
- [Compression](#producing-messages-compression)
- [GZIP](#producing-messages-compression-gzip)
- [Other](#producing-messages-compression-other)
- [Consuming messages](#consuming-messages)
- [eachMessage](#consuming-messages-each-message)
- [eachBatch](#consuming-messages-each-batch)
- [Options](#consuming-messages-options)
- [Pause, Resume, & Seek](#consuming-messages-pause-resume)
- [Seek](#consuming-messages-seek)
- [Custom partition assigner](#consuming-messages-custom-partition-assigner)
- [Describe group](#consuming-messages-describe-group)
- [Compression](#consuming-messages-compression)
- [Admin](#admin)
- [Create Topics](#admin-create-topics)
- [Instrumentation](#instrumentation)
- [Custom logging](#custom-logging)
- [Retry (detailed)](#configuration-default-retry-detailed)
- [Development](#development)

@@ -54,6 +60,4 @@ ## <a name="installation"></a> Installation

## <a name="usage"></a> Usage
## <a name="setup-client"></a> Configuration
### <a name="setup-client"></a> Setting up the Client
The client must be configured with at least one broker. The brokers on the list are considered seed brokers and are only used to bootstrap the client and load initial metadata.

@@ -71,3 +75,3 @@

#### <a name="setup-client-ssl"></a> SSL
### <a name="configuration-ssl"></a> SSL

@@ -91,7 +95,7 @@ The `ssl` option can be used to configure the TLS sockets. The options are passed directly to [`tls.connect`](https://nodejs.org/api/tls.html#tls_tls_connect_options_callback) and used to create the TLS Secure Context, all options are accepted.

Take a look at [TLS create secure context](https://nodejs.org/dist/latest-v8.x/docs/api/tls.html#tls_tls_createsecurecontext_options) for more information. `NODE_EXTRA_CA_CERTS` can be used to add custom CAs. Use `ssl: true` if you don't have any extra configurations and want to enable SSL.
Refer to [TLS create secure context](https://nodejs.org/dist/latest-v8.x/docs/api/tls.html#tls_tls_createsecurecontext_options) for more information. `NODE_EXTRA_CA_CERTS` can be used to add custom CAs. Use `ssl: true` if you don't have any extra configurations and want to enable SSL.
#### <a name="setup-client-sasl"></a> SASL
### <a name="configuration-sasl"></a> SASL
Kafka has support for using SASL to authenticate clients. The `sasl` option can be used to configure the authentication mechanism. Currently, KafkaJS only supports the `PLAIN` mechanism.
Kafka has support for using SASL to authenticate clients. The `sasl` option can be used to configure the authentication mechanism. Currently, KafkaJS supports `PLAIN`, `SCRAM-SHA-256`, and `SCRAM-SHA-512` mechanisms.

@@ -103,3 +107,3 @@ ```javascript

sasl: {
mechanism: 'plain',
mechanism: 'plain', // scram-sha-256 or scram-sha-512
username: 'my-username',

@@ -113,3 +117,3 @@ password: 'my-password'

#### <a name="setup-client-connection-timeout"></a> Connection Timeout
### <a name="configuration-connection-timeout"></a> Connection Timeout

@@ -126,6 +130,9 @@ Time in milliseconds to wait for a successful connection. The default value is: `1000`.

#### <a name="setup-client-default-retry"></a> Default Retry
### <a name="configuration-default-retry"></a> Default Retry
The `retry` option can be used to set the default configuration. The retry mechanism uses a randomization function that grows exponentially. The configuration will be used to retry connections and API calls to Kafka (when using producers or consumers).
The `retry` option can be used to set the configuration of the retry mechanism, which is be used to retry connections and API calls to Kafka (when using producers or consumers).
The retry mechanism uses a randomization function that grows exponentially.
[Detailed example](#configuration-default-retry-detailed)
If the max number of retries is exceeded the retrier will throw `KafkaJSNumberOfRetriesExceeded` and interrupt. Producers will bubble up the error to the user code; Consumers will wait the retry time attached to the exception (it will be based on the number of attempts) and perform a full restart.

@@ -156,8 +163,7 @@

#### <a name="setup-client-logger"></a> Logger
### <a name="configuration-logging"></a> Logging
KafkaJS has a built-in `STDOUT` logger which outputs JSON. It also accepts a custom log creator which allows you to integrate your favorite logger library.
There are 5 log levels available: `NOTHING`, `ERROR`, `WARN`, `INFO`, and `DEBUG`. `INFO` is configured by default.
KafkaJS has a built-in `STDOUT` logger which outputs JSON. It also accepts a custom log creator which allows you to integrate your favorite logger library. There are 5 log levels available: `NOTHING`, `ERROR`, `WARN`, `INFO`, and `DEBUG`. `INFO` is configured by default.
__How to configure the log level?__
##### Log level

@@ -181,7 +187,7 @@ ```javascript

NOTE: for more information on how to customize your logs, take a look at [Custom logger](#custom-logger)
NOTE: for more information on how to customize your logs, take a look at [Custom logging](#custom-logging)
### <a name="producing-messages"></a> Producing Messages to Kafka
## <a name="producing-messages"></a> Producing Messages
To publish messages to Kafka you have to create a producer; with a client in hand you just have to call the `producer` function, for example:
To publish messages to Kafka you have to create a producer. Simply call the `producer` function of the client to create it:

@@ -192,8 +198,2 @@ ```javascript

By default, the producer is configured to distribute the messages with the following logic:
- If a partition is specified in the message, use it
- If no partition is specified but a key is present choose a partition based on a hash (murmur2) of the key
- If no partition or key is present choose a partition in a round-robin fashion
The method `send` is used to publish messages to the Kafka cluster.

@@ -254,9 +254,56 @@

#### <a name="producing-messages-gzip-compression"></a> Compression
By default, the producer is configured to distribute the messages with the following logic:
KafkaJS __only__ supports GZIP natively; the library aims to have a small footprint and as little dependencies as possible. The remaining codecs can be easily implemented using existing libraries. Plugins providing other codecs might exist in the future, but there are no plans to implement them shortly.
- If a partition is specified in the message, use it
- If no partition is specified but a key is present choose a partition based on a hash (murmur2) of the key
- If no partition or key is present choose a partition in a round-robin fashion
##### GZIP
### <a name="producing-messages-custom-partitioner"></a> Custom partitioner
It's possible to assign a custom partitioner to the producer. A partitioner is a function which returns another function responsible for the partition selection, something like this:
```javascript
const MyPartitioner = () => {
// some initialization
return ({ topic, partitionMetadata, message }) => {
// select a partition based on some logic
// return the partition number
return 0
}
}
```
`partitionMetadata` is an array of partitions with the following structure:
`{ partitionId: <NodeId>, leader: <NodeId> }`
Example:
```javascript
[
{ partitionId: 1, leader: 1 },
{ partitionId: 2, leader: 2 },
{ partitionId: 0, leader: 0 }
]
```
To Configure your partitioner use the option `createPartitioner`.
```javascript
kafka.producer({ createPartitioner: MyPartitioner })
```
### <a name="producing-messages-retry"></a> Retry
The option `retry` can be used to customize the configuration for the producer.
Take a look at [Retry](#configuration-default-retry) for more information.
### <a name="producing-messages-compression"></a> Compression
Since KafkaJS aims to have as small footprint and as little dependencies as possible, only GZIP codec is part of the core functionality. Providing plugins supporting other codecs might be considered in the future.
#### <a name="producing-messages-compression-gzip"></a> GZIP
```javascript
const { CompressionTypes } = require('kafkajs')

@@ -278,8 +325,10 @@

##### Adding Snappy or LZ4 codecs
#### <a name="producing-messages-compression-other"></a> Other
A codec is an object with two `async` functions: `compress` and `decompress`.
Any other codec than GZIP can be easily implemented using existing libraries.
Example using the snappy package:
This is an example of how one would go about in order to add the Snappy codec.
First of all, a codec is an object with two `async` functions: `compress` and `decompress`. Import the libraries and define the codec object:
```javascript

@@ -303,3 +352,3 @@ const { promisify } = require('util')

Then, to add this implementation:
Now we that have the codec object, we can add it to the implementation:

@@ -326,47 +375,6 @@ ```javascript

#### <a name="producing-messages-custom-partitioner"></a> Custom partitioner
## <a name="consuming-messages"></a> Consuming messages from Kafka
It's possible to assign a custom partitioner to the producer. A partitioner is a function which returns another function responsible for the partition selection, something like this:
Consumer groups allow a group of machines or processes to coordinate access to a list of topics, distributing the load among the consumers. When a consumer fails the load is automatically distributed to other members of the group. Consumer groups must have unique group ids within the cluster, from a kafka broker perspective.
```javascript
const MyPartitioner = () => {
// some initialization
return ({ topic, partitionMetadata, message }) => {
// select a partition based on some logic
// return the partition number
return 0
}
}
```
`partitionMetadata` is an array of partitions with the following structure:
`{ partitionId: <NodeId>, leader: <NodeId> }`
Example:
```javascript
[
{ partitionId: 1, leader: 1 },
{ partitionId: 2, leader: 2 },
{ partitionId: 0, leader: 0 }
]
```
To Configure your partitioner use the option `createPartitioner`.
```javascript
kafka.producer({ createPartitioner: MyPartitioner })
```
#### <a name="producing-messages-retry"></a> Retry
The option `retry` can be used to customize the configuration for the producer.
Take a look at [Retry](#setup-client-default-retry) for more information.
### <a name="consuming-messages"></a> Consuming messages from Kafka
Consumer groups allow a group of machines or processes to coordinate access to a list of topics, distributing the load among the consumers. When a consumer fails the load is automatically distributed to other members of the group. Consumer groups must have unique group ids.
Creating the consumer:

@@ -395,5 +403,5 @@

#### <a name="consuming-messages-each-message"></a> eachMessage
### <a name="consuming-messages-each-message"></a> eachMessage
This handler provides a convenient API, feeding your function one message at a time. The handler will automatically commit your offsets and heartbeat at the configured interval. If you are just looking to get started with Kafka consumers this should be your first solution.
The `eachMessage` handler provides a convenient and easy to use API, feeding your function one message at a time. It is implemented on top of `eachBatch`, and it will automatically commit your offsets and heartbeat at the configured interval for you. If you are just looking to get started with Kafka consumers this a good place to start.

@@ -424,6 +432,8 @@ ```javascript

#### <a name="consuming-messages-each-batch"></a> eachBatch
### <a name="consuming-messages-each-batch"></a> eachBatch
Some use cases can be optimized by dealing with batches rather than single messages. This handler will feed your function batches and some utility functions to give your code more flexibility. Be aware that using `eachBatch` is considered a more advanced use case since you will have to understand how session timeouts and heartbeats are connected. All resolved offsets will be automatically committed after the function is executed.
Some use cases requires dealing with batches directly. This handler will feed your function batches and provide some utility functions to give your code more flexibility: `resolveOffset`, `heartbeat`, and `isRunning`. All resolved offsets will be automatically committed after the function is executed.
Be aware that using `eachBatch` directly is considered a more advanced use case as compared to using `eachMessage`, since you will have to understand how session timeouts and heartbeats are connected.
```javascript

@@ -451,11 +461,12 @@ // create consumer, connect and subscribe ...

})
// remember to close your consumer when you leave
await consumer.disconnect()
```
> `highWatermark` is the last committed offset within the topic partition. It can be useful for calculating lag.
> `batch.highWatermark` is the last committed offset within the topic partition. It can be useful for calculating lag.
`resolveOffset` is used to mark the message as processed. In case of errors, the consumer will automatically commit the resolved offsets. With the default configuration, the function can't be interrupted without ignoring the unprocessed message; this happens because after the function is executed the last offset of the batch is automatically resolved and committed. To have a fine grain control of message processing it's possible to disable the auto-resolve, setting the property `eachBatchAutoResolve` to false. Example:
> `eachBatchAutoResolve` configures auto-resolve of batch processing. If set to true, KafkaJS will automatically commit the last offset of the batch if `eachBatch` doesn't throw an error. Default: true.
> `resolveOffset()` is used to mark a message in the batch as processed. In case of errors, the consumer will automatically commit the resolved offsets.
Example:
```javascript

@@ -475,5 +486,5 @@ consumer.run({

In this example, if the consumer is shutting down in the middle of the batch, the remaining messages won't be resolved and therefore not committed.
In the example above, if the consumer is shutting down in the middle of the batch, the remaining messages won't be resolved and therefore not committed. This way, you can quickly shut down the consumer without losing/skipping any messages.
#### <a name="consuming-messages-options"></a> Options
### <a name="consuming-messages-options"></a> Options

@@ -503,40 +514,26 @@ ```javascript

| maxWaitTimeInMs | The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by `minBytes` | `5000` |
| retry | Take a look at [Retry](#setup-client-default-retry) for more information\ | `{ retries: 10 }` |
| retry | See [retry](#configuration-default-retry) for more information | `{ retries: 10 }` |
#### <a name="consuming-messages-pause-resume"></a> Pause & Resume
### <a name="consuming-messages-pause-resume"></a> Pause & Resume
In order to pause and resume consuming from one or more topics, the `Consumer` provides the methods `pause` and `resume`. Note that pausing a topic means that it won't be fetched in the next cycle. You may still receive messages for the topic within the current batch.
Calling `pause` with a topic that the consumer is not subscribed to is a no-op, calling `resume` with a topic that is not paused is also a no-op.
Example: A situation where this could be useful is when an external dependency used by the consumer is under too much load. Here we want to `pause` consumption from a topic when this happens, and after a predefined interval we `resume` again:
```javascript
await consumer.connect()
await consumer.subscribe({ topic: 'jobs' })
await consumer.subscribe({ topic: 'pause' })
await consumer.subscribe({ topic: 'resume' })
await consumer.run({ eachMessage: async ({ topic, message }) => {
switch(topic) {
case 'jobs':
doSomeWork(message)
break
case 'pause':
// Stop consuming from the 'jobs' topic.
consumer.pause([{ topic: 'jobs'}])
try {
await sendToDependency(message)
} catch (e) {
if (e instanceof TooManyRequestsError) {
consumer.pause([{ topic }])
setTimeout(() => consumer.resume([{ topic }]), e.retryAfter * 1000)
}
// `pause` accepts an optional `partitions` property for each topic
// to pause consuming only specific partitions. However, this
// functionality is not currently supported by the library.
//
// consumer.pause([{ topic: 'jobs', partitions: [0, 1] }])
break
case 'resume':
// Resume consuming from the 'jobs' topic
consumer.resume([{ topic: 'jobs' }])
// `resume` accepts an optional `partitions` property for each topic
// to resume consuming only specific partitions. However, this
// functionality is not currently supported by the library.
//
// consumer.resume([{ topic: 'jobs', partitions: [0, 1] }])
break
throw e
}

@@ -546,6 +543,4 @@ }})

Calling `pause` with a topic that the consumer is not subscribed to is a no-op, as is calling `resume` with a topic that is not paused.
### <a name="consuming-messages-seek"></a> Seek
#### <a name="consuming-messages-seek"></a> Seek
To move the offset position in a topic/partition the `Consumer` provides the method `seek`. This method has to be called after the consumer is initialized and is running (after consumer#run).

@@ -562,3 +557,3 @@

#### <a name="consuming-messages-custom-partition-assigner"></a> Custom partition assigner
### <a name="consuming-messages-custom-partition-assigner"></a> Custom partition assigner

@@ -621,5 +616,5 @@ It's possible to configure the strategy the consumer will use to distribute partitions amongst the consumer group. KafkaJS has a round robin assigner configured by default.

Your `protocol` method will probably look like the example but it's not implemented by default because extra data can be included as `userData`, take a look at the `MemberMetadata#encode` for more information.
Your `protocol` method will probably look like the example, but it's not implemented by default because extra data can be included as `userData`. Take a look at the `MemberMetadata#encode` for more information.
Once your assigner is done add it to the list of assigners, it's important to keep the default assigner there to allow the old consumers to have a common ground with the new consumers when deploying.
Once your assigner is done, add it to the list of assigners. It's important to keep the default assigner there to allow the old consumers to have a common ground with the new consumers when deploying.

@@ -638,3 +633,3 @@ ```javascript

#### <a name="consuming-messages-describe-group"></a> Describe group
### <a name="consuming-messages-describe-group"></a> Describe group

@@ -665,2 +660,55 @@ > Experimental - This feature may be removed or changed in new versions of KafkaJS

### <a name="consuming-messages-compression"></a> Compression
KafkaJS only support GZIP natively, but [other codecs can be supported](#producing-messages-compression-other).
## <a name="admin"></a> Admin
The admin client will host all the cluster operations, such as: `createTopics`, `createPartitions`, etc. Currently, only `createTopics` is available.
```javascript
const kafka = new Kafka(...)
const admin = kafka.admin() // kafka.admin({ retry: { retries: 2 } })
// remember to connect/disconnect the client
await admin.connect()
await admin.disconnect()
```
The option `retry` can be used to customize the configuration for the admin.
Take a look at [Retry](#configuration-default-retry) for more information.
### <a name="admin-create-topics"></a> Create topics
`createTopics` will resolve to `true` if the topic was created successfully or `false` if it already exists. The method will throw exceptions in case of errors.
```javascript
await admin.createTopics({
validateOnly: <boolean>,
waitForLeaders: <boolean>
timeout: <Number>,
topics: <Topic[]>,
})
```
`Topic` structure:
```javascript
{
topic: <String>,
numPartitions: <Number>, // default: 1
replicationFactor: <Number>, // default: 1
replicaAssignment: <Array>, // Example: [{ partition: 0, replicas: [0,1,2] }] - default: []
configEntries: <Array> // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: []
}
```
| property | description | default |
| -------------- | ----------- | ------- |
| topics | Topic definition | |
| validateOnly | If this is `true`, the request will be validated, but the topic won't be created. | false |
| timeout | The time in ms to wait for a topic to be completely created on the controller node | 5000 |
| waitForLeaders | If this is `true` it will wait until metadata for the new topics doesn't throw `LEADER_NOT_AVAILABLE` | true |
## <a name="instrumentation"></a> Instrumentation

@@ -696,14 +744,11 @@

## <a name="custom-logger"></a> Custom logger
## <a name="custom-logging"></a> Custom logging
The logger is customized using log creators. A log creator is a function which receives a log level and returns a log function. The log function receives namespace, level, label, and log.
`namespace` identifies the component which is performing the log, for example, connection or consumer.
- `namespace` identifies the component which is performing the log, for example, connection or consumer.
- `level` is the log level of the log entry.
- `label` is a text representation of the log level, example: 'INFO'.
- `log` is an object with the following keys: `timestamp`, `logger`, `message`, and the extra keys given by the user. (`logger.info('test', { extra_data: true })`)
`level` is the log level of the log entry.
`label` is a text representation of the log level, example: 'INFO'.
`log` is an object with the following keys: `timestamp`, `logger`, `message`, and the extra keys given by the user. (`logger.info('test', { extra_data: true })`)
```javascript

@@ -730,3 +775,3 @@ {

Example using [winston](https://github.com/winstonjs/winston):
Example using [Winston](https://github.com/winstonjs/winston):

@@ -778,5 +823,36 @@ ```javascript

## <a name="configuration-default-retry-detailed"></a> Retry (detailed)
The retry mechanism uses a randomization function that grows exponentially. This formula and how the default values affect it is best described by the example below:
- 1st retry:
- Always a flat `initialRetryTime` ms
- Default: `300ms`
- Nth retry:
- Formula: `Random(previousRetryTime * (1 - factor), previousRetryTime * (1 + factor)) * multiplier`
- N = 1:
- Since `previousRetryTime == initialRetryTime` just plug the values in the formula:
- Random(300 * (1 - 0.2), 300 * (1 + 0.2)) * 2 => Random(240, 360) * 2 => (480, 720) ms
- Hence, somewhere between `480ms` to `720ms`
- N = 2:
- Since `previousRetryTime` from N = 1 was in a range between 480ms and 720ms, the retry for this step will be in the range of:
- `previousRetryTime = 480ms` => Random(480 * (1 - 0.2), 480 * (1 + 0.2)) * 2 => Random(384, 576) * 2 => (768, 1152) ms
- `previousRetryTime = 720ms` => Random(720 * (1 - 0.2), 720 * (1 + 0.2)) * 2 => Random(576, 864) * 2 => (1152, 1728) ms
- Hence, somewhere between `768ms` to `1728ms`
- And so on...
Table of retry times for default values:
| Retry # | min (ms) | max (ms) |
| ------- | -------- | -------- |
| 1 | 300 | 300 |
| 2 | 480 | 720 |
| 3 | 768 | 1728 |
| 4 | 1229 | 4147 |
| 5 | 1966 | 9953 |
## <a name="development"></a> Development
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
https://kafka.apache.org/protocol.html

@@ -793,2 +869,3 @@

./scripts/dockerComposeUp.sh
./scripts/createScramCredentials.sh
yarn test:local

@@ -800,3 +877,3 @@

Password for test keystore and certificates: `testtest`
Password for test keystore and certificates: `testtest`
Password for SASL `test:testtest`

@@ -806,3 +883,3 @@

Thanks to [Sebastian Norde](https://github.com/sebastiannorde) for the awesome logo!
Thanks to [Sebastian Norde](https://github.com/sebastiannorde) for the logo ❤️

@@ -809,0 +886,0 @@ ## License

@@ -337,2 +337,23 @@ const { Types: Compression } = require('../protocol/message/compression')

}
/**
* @public
* @param {Array} topics e.g:
* [
* {
* topic: 'topic-name',
* numPartitions: 1,
* replicationFactor: 1
* }
* ]
* @param {boolean} [validateOnly=false] If this is true, the request will be validated, but the topic
* won't be created
* @param {number} [timeout=5000] The time in ms to wait for a topic to be completely created
* on the controller node
* @returns {Promise}
*/
async createTopics({ topics, validateOnly = false, timeout = 5000 }) {
const createTopics = this.lookupRequest(apiKeys.CreateTopics, requests.CreateTopics)
return await this.connection.send(createTopics({ topics, validateOnly, timeout }))
}
}
const { requests, lookup } = require('../../protocol/requests')
const apiKeys = require('../../protocol/requests/apiKeys')
const PlainAuthenticator = require('./plain')
const SCRAM256Authenticator = require('./scram256')
const SCRAM512Authenticator = require('./scram512')
const { KafkaJSSASLAuthenticationError } = require('../../errors')
const AUTHENTICATORS = { PLAIN: PlainAuthenticator }
const AUTHENTICATORS = {
PLAIN: PlainAuthenticator,
'SCRAM-SHA-256': SCRAM256Authenticator,
'SCRAM-SHA-512': SCRAM512Authenticator,
}
const SUPPORTED_MECHANISMS = Object.keys(AUTHENTICATORS)

@@ -8,0 +15,0 @@

@@ -165,3 +165,3 @@ const Broker = require('../broker')

/**
* @private
* @public
* @returns {Promise<Broker>}

@@ -168,0 +168,0 @@ */

@@ -7,4 +7,6 @@ const BrokerPool = require('./brokerPool')

KafkaJSError,
KafkaJSBrokerNotFound,
KafkaJSMetadataNotLoaded,
KafkaJSTopicMetadataNotLoaded,
KafkaJSGroupCoordinatorNotFound,
KafkaJSTopicMetadataNotLoaded,
} = require('../errors')

@@ -90,3 +92,3 @@

this.targetTopics.add(topic)
const hasChanged = previousSize !== this.targetTopics.size || !this.metadata
const hasChanged = previousSize !== this.targetTopics.size || !this.brokerPool.metadata

@@ -109,2 +111,25 @@ if (hasChanged) {

* @public
* @returns {Promise<Broker>}
*/
async findControllerBroker() {
const { metadata } = this.brokerPool
// controllerId is an int32, it's safe to cast to Number
if (!metadata || Number.isNaN(Number(metadata.controllerId))) {
throw new KafkaJSMetadataNotLoaded('Topic metadata not loaded')
}
const broker = await this.findBroker({ nodeId: metadata.controllerId })
if (!broker) {
throw new KafkaJSBrokerNotFound(
`Controller broker with id ${metadata.controllerId} not found in the cached metadata`
)
}
return broker
}
/**
* @public
* @param {string} topic

@@ -111,0 +136,0 @@ * @returns {Object} Example:

@@ -49,3 +49,4 @@ class KafkaJSError extends Error {

class KafkaJSTopicMetadataNotLoaded extends KafkaJSError {
class KafkaJSMetadataNotLoaded extends KafkaJSError {}
class KafkaJSTopicMetadataNotLoaded extends KafkaJSMetadataNotLoaded {
constructor(e, { topic } = {}) {

@@ -62,2 +63,3 @@ super(e)

class KafkaJSNotImplemented extends KafkaJSNonRetriableError {}
class KafkaJSTimeout extends KafkaJSNonRetriableError {}

@@ -76,3 +78,5 @@ module.exports = {

KafkaJSNotImplemented,
KafkaJSMetadataNotLoaded,
KafkaJSTopicMetadataNotLoaded,
KafkaJSTimeout,
}

@@ -6,2 +6,3 @@ const { createLogger, LEVELS: { INFO } } = require('./loggers')

const createConsumer = require('./consumer')
const createAdmin = require('./admin')

@@ -78,2 +79,14 @@ const { assign } = Object

}
/**
* @public
*/
admin({ retry } = {}) {
const cluster = this.createCluster()
return createAdmin({
retry: assign({}, cluster.retry, retry),
logger: this.logger,
cluster,
})
}
}

@@ -53,2 +53,3 @@ const createRetry = require('../retry')

this.authHandlers = null
this.authExpectResponse = false

@@ -177,3 +178,4 @@ const log = level => (message, extra = {}) => {

*/
authenticate({ request, response }) {
authenticate({ authExpectResponse = false, request, response }) {
this.authExpectResponse = authExpectResponse
return new Promise(async (resolve, reject) => {

@@ -183,2 +185,4 @@ this.authHandlers = {

this.authHandlers = null
this.authExpectResponse = false
response

@@ -191,2 +195,4 @@ .decode(rawData)

this.authHandlers = null
this.authExpectResponse = false
reject(

@@ -299,3 +305,3 @@ new KafkaJSConnectionError('Connection closed by the server', {

processData(rawData) {
if (this.authHandlers) {
if (this.authHandlers && !this.authExpectResponse) {
return this.authHandlers.onSuccess(rawData)

@@ -321,2 +327,7 @@ }

this.buffer = Buffer.alloc(0)
if (this.authHandlers) {
return this.authHandlers.onSuccess(data)
}
const correlationId = decoder.readInt32()

@@ -323,0 +334,0 @@ const payload = decoder.readAll()

@@ -16,2 +16,10 @@ const Long = require('long')

static decodeZigZag(value) {
return (value >>> 1) ^ -(value & 1)
}
static decodeZigZag64(longValue) {
return longValue.shiftRightUnsigned(1).xor(longValue.and(Long.fromInt(1)).negate())
}
constructor(buffer) {

@@ -69,2 +77,15 @@ this.buffer = buffer

readVarIntString() {
const byteLength = this.readVarInt()
if (byteLength === -1) {
return null
}
const stringBuffer = this.buffer.slice(this.offset, this.offset + byteLength)
const value = stringBuffer.toString('utf8')
this.offset += byteLength
return value
}
canReadBytes(length) {

@@ -86,2 +107,14 @@ return Buffer.byteLength(this.buffer) - this.offset >= length

readVarIntBytes() {
const byteLength = this.readVarInt()
if (byteLength === -1) {
return null
}
const stringBuffer = this.buffer.slice(this.offset, this.offset + byteLength)
this.offset += byteLength
return stringBuffer
}
readBoolean() {

@@ -129,3 +162,3 @@ return this.readInt8() === 1

readSignedVarInt32() {
readVarInt() {
let currentByte

@@ -141,10 +174,6 @@ let result = 0

return this.decodeZigZag(result)
return Decoder.decodeZigZag(result)
}
decodeZigZag(value) {
return (value >>> 1) ^ -(value & 1)
}
readSignedVarInt64() {
readVarLong() {
let currentByte

@@ -160,9 +189,5 @@ let result = Long.fromInt(0)

return this.decodeZigZag64(result)
return Decoder.decodeZigZag64(result)
}
decodeZigZag64(longValue) {
return longValue.shiftRightUnsigned(1).xor(longValue.and(Long.fromInt(1)).negate())
}
slice(size) {

@@ -169,0 +194,0 @@ return new Decoder(this.buffer.slice(this.offset, this.offset + size))

@@ -14,2 +14,45 @@ const Long = require('long')

module.exports = class Encoder {
static encodeZigZag(value) {
return (value << 1) ^ (value >> 31)
}
static encodeZigZag64(value) {
const longValue = Long.fromValue(value)
return longValue.shiftLeft(1).xor(longValue.shiftRight(63))
}
static sizeOfVarInt(value) {
let encodedValue = this.encodeZigZag(value)
let bytes = 1
while ((encodedValue & UNSIGNED_INT32_MAX_NUMBER) !== 0) {
bytes += 1
encodedValue >>>= 7
}
return bytes
}
static sizeOfVarLong(value) {
let longValue = Encoder.encodeZigZag64(value)
let bytes = 1
while (longValue.and(UNSIGNED_INT64_MAX_NUMBER).notEquals(Long.fromInt(0))) {
bytes += 1
longValue = longValue.shiftRightUnsigned(7)
}
return bytes
}
static sizeOfVarIntBytes(value) {
const size = value === null ? -1 : Buffer.byteLength(value)
if (size < 0) {
return Encoder.sizeOfVarInt(-1)
}
return Encoder.sizeOfVarInt(size) + size
}
constructor() {

@@ -40,2 +83,9 @@ this.buffer = Buffer.alloc(0)

writeUInt32(value) {
const tempBuffer = Buffer.alloc(INT32_SIZE)
tempBuffer.writeUInt32BE(value)
this.buffer = Buffer.concat([this.buffer, tempBuffer])
return this
}
writeInt64(value) {

@@ -69,2 +119,16 @@ const tempBuffer = Buffer.alloc(INT64_SIZE)

writeVarIntString(value) {
if (value == null) {
this.writeVarInt(-1)
return this
}
const byteLength = Buffer.byteLength(value, 'utf8')
this.writeVarInt(byteLength)
const tempBuffer = Buffer.alloc(byteLength)
tempBuffer.write(value, 0, byteLength, 'utf8')
this.buffer = Buffer.concat([this.buffer, tempBuffer])
return this
}
writeBytes(value) {

@@ -92,2 +156,24 @@ if (value == null) {

writeVarIntBytes(value) {
if (value == null) {
this.writeVarInt(-1)
return this
}
if (Buffer.isBuffer(value)) {
// raw bytes
this.writeVarInt(value.length)
this.buffer = Buffer.concat([this.buffer, value])
} else {
const valueToWrite = String(value)
const byteLength = Buffer.byteLength(valueToWrite, 'utf8')
this.writeVarInt(byteLength)
const tempBuffer = Buffer.alloc(byteLength)
tempBuffer.write(valueToWrite, 0, byteLength, 'utf8')
this.buffer = Buffer.concat([this.buffer, tempBuffer])
}
return this
}
writeEncoder(value) {

@@ -121,7 +207,19 @@ if (value instanceof Encoder !== true) {

writeVarIntArray(array, type) {
this.writeVarInt(array.length)
array.forEach(value => {
switch (type || typeof value) {
case 'object':
this.writeEncoder(value)
break
}
})
return this
}
// Based on:
// https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/util/Varint.java#L106
writeSignedVarInt32(value) {
writeVarInt(value) {
const byteArray = []
let encodedValue = this.encodeZigZag(value)
let encodedValue = Encoder.encodeZigZag(value)

@@ -138,9 +236,5 @@ while ((encodedValue & UNSIGNED_INT32_MAX_NUMBER) !== 0) {

encodeZigZag(value) {
return (value << 1) ^ (value >> 31)
}
writeSignedVarInt64(value) {
writeVarLong(value) {
const byteArray = []
let longValue = this.encodeZigZag64(value)
let longValue = Encoder.encodeZigZag64(value)

@@ -163,7 +257,2 @@ while (longValue.and(UNSIGNED_INT64_MAX_NUMBER).notEquals(Long.fromInt(0))) {

encodeZigZag64(value) {
const longValue = Long.fromValue(value)
return longValue.shiftLeft(1).xor(longValue.shiftRight(63))
}
size() {

@@ -170,0 +259,0 @@ return Buffer.byteLength(this.buffer)

@@ -21,3 +21,3 @@ const requests = {

ApiVersions: require('./apiVersions'),
CreateTopics: {},
CreateTopics: require('./createTopics'),
DeleteTopics: {},

@@ -24,0 +24,0 @@ }

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc