
Security News
Attackers Are Hunting High-Impact Node.js Maintainers in a Coordinated Social Engineering Campaign
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.
**KafkaTS** is a Apache Kafka client library for Node.js. It provides both a low-level API for communicating directly with the Apache Kafka cluster and high-level APIs for publishing and subscribing to Kafka topics.
KafkaTS is a Apache Kafka client library for Node.js. It provides both a low-level API for communicating directly with the Apache Kafka cluster and high-level APIs for publishing and subscribing to Kafka topics.
Supported Kafka versions: ^3.6.x, ^4.0.0
npm install kafka-ts
export const kafka = createKafkaClient({
clientId: 'my-app',
bootstrapServers: [{ host: 'localhost', port: 9092 }],
});
const consumer = await kafka.startConsumer({
groupId: 'my-consumer-group',
topics: ['my-topic'],
onBatch: (messages) => {
console.log(messages);
},
});
export const producer = kafka.createProducer();
await producer.send([{ topic: 'my-topic', key: 'key', value: 'value' }]);
const cluster = kafka.createCluster();
await cluster.connect();
const { controllerId } = await cluster.sendRequest(API.METADATA, {
allowTopicAutoCreation: false,
includeTopicAuthorizedOperations: false,
topics: [],
});
await cluster.sendRequestToNode(controllerId)(API.CREATE_TOPICS, {
validateOnly: false,
timeoutMs: 10_000,
topics: [
{
name: 'my-topic',
numPartitions: 10,
replicationFactor: 3,
assignments: [],
configs: [],
},
],
});
await cluster.disconnect();
process.once('SIGTERM', async () => {
await consumer.close(); // waits for the consumer to finish processing the last batch and disconnects
await producer.close();
});
See the examples for more detailed examples.
By default KafkaTS logs out using a JSON logger. This can be globally replaced by calling setLogger method (see src/utils/logger.ts)
By default KafkaTS retries onBatch using an exponential backoff delay up to 5 times (see src/utils/retrier.ts). In case of failure the consumer is restarted.
In case you want to skip failed messages or implement a DLQ-like mechanism, you can overwrite retrier on startConsumer() and execute your own logic onFailure.
Example if you simply want to skip the failing messages:
await kafka.startConsumer({
// ...
retrier: createExponentialBackoffRetrier({ onFailure: () => {} }),
});
By default, messages are partitioned by message key or round-robin if the key is null or undefined. Partition can be overwritten by partition property in the message. You can also override the default partitioner per producer instance kafka.createProducer({ partitioner: customPartitioner }).
A simple example how to partition messages by the value in message header x-partition-key:
import type { Partitioner } from 'kafka-ts';
import { defaultPartitioner } from 'kafka-ts';
const myPartitioner: Partitioner = (context) => {
const partition = defaultPartitioner(context);
return (message) => partition({ ...message, key: message.headers?.['x-partition-key'] });
};
const producer = kafka.createProducer({ partitioner: myPartitioner });
await producer.send([{ topic: 'my-topic', value: 'value', headers: { 'x-partition-key': '123' } }]);
The existing low-level libraries (e.g. node-rdkafka) are bindings on librdkafka, which doesn't give enough control over the consumer logic. The existing high-level libraries (e.g. kafkajs) are missing a few crucial features.
groupInstanceId in addition to groupId can avoid rebalancing and continue consuming partitions in the existing assignment.createKafkaClient()| Name | Type | Required | Default | Description |
|---|---|---|---|---|
| clientId | string | false | null | The client id used for all requests. |
| bootstrapServers | TcpSocketConnectOpts[] | true | List of kafka brokers for initial cluster discovery. | |
| sasl | SASLProvider | false | SASL provider | |
| ssl | TLSSocketOptions | false | SSL configuration. | |
| requestTimeout | number | false | 60000 | Request timeout in milliseconds. |
saslPlain({ username, password })saslScramSha256({ username, password })saslScramSha512({ username, password })oAuthBearer(oAuthAuthenticator({ endpoint, clientId, clientSecret }))Custom SASL mechanisms can be implemented following the SASLProvider interface. See src/auth for examples.
kafka.startConsumer()| Name | Type | Required | Default | Description |
|---|---|---|---|---|
| topics | string[] | true | List of topics to subscribe to | |
| groupId | string | false | null | Consumer group id |
| groupInstanceId | string | false | null | Consumer group instance id |
| rackId | string | false | null | Rack id |
| isolationLevel | IsolationLevel | false | IsolationLevel.READ_UNCOMMITTED | Isolation level |
| sessionTimeoutMs | number | false | 30000 | Session timeout in milliseconds |
| rebalanceTimeoutMs | number | false | 60000 | Rebalance timeout in milliseconds |
| maxWaitMs | number | false | 5000 | Fetch long poll timeout in milliseconds |
| minBytes | number | false | 1 | Minimum number of bytes to wait for before returning a fetch response |
| maxBytes | number | false | 1_048_576 | Maximum number of bytes to return in the fetch response |
| partitionMaxBytes | number | false | 1_048_576 | Maximum number of bytes to return per partition in the fetch response |
| allowTopicAutoCreation | boolean | false | false | Allow kafka to auto-create topic when it doesn't exist |
| fromTimestamp | bigint | false | -1 | Start consuming messages from timestamp (-1 = latest offsets, -2 = earliest offsets) |
| onBatch | (batch: Message[]) => Promise | true | Callback executed when a batch of messages is received |
kafka.createProducer()| Name | Type | Required | Default | Description |
|---|---|---|---|---|
| allowTopicAutoCreation | boolean | false | false | Allow kafka to auto-create topic when it doesn't exist |
| partitioner | Partitioner | false | defaultPartitioner | Custom partitioner function. By default, it uses a default java-compatible partitioner. |
producer.send(messages: Message[])| Name | Type | Required | Default | Description |
|---|---|---|---|---|
| topic | string | true | Topic to send the message to | |
| partition | number | false | null | Partition to send the message to. By default partitioned by key. If key is also missing, partition is assigned round-robin |
| timestamp | bigint | false | null | Message timestamp in milliseconds |
| key | Buffer | null | false | null | Message key |
| value | Buffer | null | true | Message value | |
| headers | Record<string, string> | false | null | Message headers |
FAQs
**KafkaTS** is a Apache Kafka client library for Node.js. It provides both a low-level API for communicating directly with the Apache Kafka cluster and high-level APIs for publishing and subscribing to Kafka topics.
We found that kafka-ts demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.

Security News
Axios compromise traced to social engineering, showing how attacks on maintainers can bypass controls and expose the broader software supply chain.

Security News
Node.js has paused its bug bounty program after funding ended, removing payouts for vulnerability reports but keeping its security process unchanged.