
Security News
Feross on the 10 Minutes or Less Podcast: Nobody Reads the Code
Socket CEO Feross Aboukhadijeh joins 10 Minutes or Less, a podcast by Ali Rohde, to discuss the recent surge in open source supply chain attacks.
kafka-node-tools
Advanced tools
This module is a wrapper for node-rdkafka. All documentation about broker configuration: https://github.com/Blizzard/node-rdkafka
{
"client.id": "myClientId",
"metadata.broker.list": "kafka.com",
"compression.codec": "gzip",
"retry.backoff.ms": 200,
"message.send.max.retries": 10,
"socket.keepalive.enable": true,
"queue.buffering.max.messages": 100000,
"queue.buffering.max.ms": 1000,
"batch.num.messages": 10,
"dr_cb": true
}
Sends message to kafka and resolvs promise without waiting for delivery report
produce(topic, partition, msg, key, timestamp, opaque)
topic Topic to send the message to.
partition Optionally we can manually specify a partition for the message.this defaults to -1 - which will use librdkafka's default partitioner (consistent random for keyed messages, random for unkeyed messages)
msg Message to send. Must be a buffer. Ex new Buffer('My message')
key For keyed messages, we also specify the key - This field is optional
timestamp you can send a timestamp here. If your broker version supports it will get added. Otherwise, we default to 0
opaque you can send an opaque token here, which gets passed along to your delivery reports
Sends message to kafka and resolvs promise after receive delivery report or reject if timeout has expired
secureProduce(topic, partition, msg, key, timestamp, opaque)
topic Topic to send the message to.
partition Optionally we can manually specify a partition for the message.this defaults to -1 - which will use librdkafka's default partitioner (consistent random for keyed messages, random for unkeyed messages)
msg Message to send. Must be a buffer. Ex new Buffer('My message')
key For keyed messages, we also specify the key - This field is optional
timestamp you can send a timestamp here. If your broker version supports it will get added. Otherwise, we default to 0
opaque you can send an opaque token here, which gets passed along to your delivery reports
'group.id': process.env.KAFKA_CONSUMER_ID,
'metadata.broker.list': process.env.KAFKA_BROKER_LIST,
'enable.auto.commit': false,
'offset_commit_cb': function(err, topicPartitions) {
if (err) {
// There was an error committing
} else {
// Commit went through. Let's log the topic partitions
}
}
{ 'auto.offset.reset': 'earliest' }
Default configuration
{
'maxBatch': 500,
'batchInterval': 1000,
'batchInc': 10,
'batchDec': 50,
'maxSystemMessages': 100
}
Example:
"options": {
"global": {
"group.id": "myGroupId",
"metadata.broker.list": "kafka.com",
"enable.auto.commit": false,
},
"topic": {
"auto.offset.reset": "earliest"
},
"batchConsumer": {
"maxBatch": 500,
"batchInterval": 1000,
"batchInc": 10,
"batchDec": 50,
"maxSystemMessages": 100
}
}
consume(handler, onError, topics)
batchConsume(handler, onError, topics)
const KafkaTools = require('kafka-node-tools');
const Config = require('./config');
Promise.all([
KafkaTools.Consumer.connect(Config.consumerOptions),
KafkaTools.Producer.connect(Config.producerOptions)
])
.then(([consumer, producer]) => {
const handler = (msg) => {
return producer.secureProduce(
Config.producerTopic, null,
new Buffer(JSON.stringify(parsedMsg)), null,
new Date().getTime(),
null)
.then((result) => {
console.log('Finish processing ' + msg.partition + '-' + msg.offset)
})
};
consumer.batchConsume(handler,
() => {
return new Promise((resolve, reject) => {
resolve('OnError handler finished')
})
}, Config.consumerTopics)
})
.catch((error) => {
console.log(error);
process.exit(1);
})
FAQs
Kafka tools
We found that kafka-node-tools demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Socket CEO Feross Aboukhadijeh joins 10 Minutes or Less, a podcast by Ali Rohde, to discuss the recent surge in open source supply chain attacks.

Research
/Security News
Campaign of 108 extensions harvests identities, steals sessions, and adds backdoors to browsers, all tied to the same C2 infrastructure.

Security News
OpenAI rotated macOS signing certificates after a malicious Axios package reached its CI pipeline in a broader software supply chain attack.