
Product
Introducing Tier 1 Reachability: Precision CVE Triage for Enterprise Teams
Socket’s new Tier 1 Reachability filters out up to 80% of irrelevant CVEs, so security teams can focus on the vulnerabilities that matter.
kafka-native
Advanced tools
The kafka-native client provides consume and produce functionality for Kafka, using the librdkafka native library for performance. Periodic stats on local and kafka queue lengths are provided for monitoring and analysis. Message and offset progress is atomically recorded to local storage. Multi-core consume side processing is possible via node cluster.
var kafka_native = require('kafka-native');
var broker = 'localhost:9092';
var topic = 'example';
var producer = new kafka_native.Producer({
broker: broker
});
var consumer = new kafka_native.Consumer({
broker: broker,
topic: topic,
offset_directory: './kafka-offsets',
receive_callback: function(data) {
data.messages.forEach(function(m) {
console.log('message: ', m.topic, m.partition, m.offset, m.payload);
});
return Promise.resolve();
}
});
producer.partition_count(topic)
.then(function(npartitions) {
var partition = 0;
setInterval(function() {
producer.send(topic, partition++ % npartitions, ['hello']);
}, 1000);
return consumer.start();
});
See the examples
directory for more.
Install via npm, which will install dependencies and compile the node addon:
$ npm install kafka-native
If needed, the script scripts/test-server.sh
can be used to create a test/dev kafka instance. This uses docker-compose to launch kafka and zookeeper containers. If you're using Mac/Windows, make sure you've run the appropriate eval $(docker-machine env ...)
command before launching the test-server control script.
$ # Mac/Windows: eval $(docker-machine env <your-default-machine-name>)
$ eval $(scripts/test-server.sh start)
$ npm test
$ scripts/test-server.sh stop
If you already have a running kafka instance, you can export its brokerlist to NODE_KAFKA_NATIVE_BROKER
before running the tests or examples. Note that the mocha tests will fail if automatic topic creation is not allowed, or if topics are created with just one partition.
Code coverage report is available by running:
$ npm test --coverage
You can then browse locally to $REPO/coverage/lcov-report/index.html
to see Istanbul's report.
Creating a new Producer
requires only a broker, after which, you can push messages into Kafka via the producer.send(topic, partition, messages)
call.
To find the number of partitions for a topic, you can use the producer.partition_count(topic)
call. It's up to the user of Producer
to determine how it wants to distribute any messages across a topic's partitions. The examples in the examples
directory all round-robin messages.
The periodic stats from the Producer report the queue length of messages awaiting transmission via send_queue_length
. This is also reported from each send
call. You can cap the maximum number of locally enqueued messages via the queue_buffering_max_messages
option; attempts to send messages when over this limit will cause the messages to be dropped.
The full librdafka JSON statistics object is returned as rdkafka_stats
, in case you want to parse out further stats.
The Consumer
requires a local directory to record processed message offsets, as well as the broker and topic to use. Once a Consumer
has been created, its start()
method will initiate pulling messages from Kafka, and feeding them back to the configured receive_callback
. The Consumer
will keep trying to pull messages from Kafka in the background, up to the number of kilobytes of memory specified by the queued_max_messages_kbytes
option, so that it can feed the receive_callback
as soon as possible.
When your receive_callback
returns - or if it returns a Promise, when that Promise is fulfilled or rejected - the message offsets will be recorded locally, to ensure that any future Consumer instances don't ask for that message offset again. This information is recorded as a set of files in the offset_directory
, one per topic-partition.
Flow control is possible via the pause()
and resume()
methods. With these methods, you can easily match the rate of pulling new messages from Kafka with your processing rate:
var consumer = new Consumer({
receive_callback: function(data) {
consumer.pause();
return process_messages(data.messages)
.finally(function() {
consumer.resume();
});
}
});
For Kafka topics with multiple partitions, you may want multiple consumers processing the topic. At Consumer
initialization time, via the num_workers
and worker_slot
options, you may configure the Consumer to work with only a subset of the available partitions. See examples/node-cluster.js
for a working example that uses the node cluster module to automatically create several consumer processes.
The Consumer
will report periodic stats if the stats_callback
option is supplied:
waiting_kafka
: The number of messages that are sitting at the Kafka broker waiting to be pulled.waiting_local
: The number of messages that have been pulled from kafka, and are sitting in local memory, but have not yet been handed to the caller's receive_callback
.kafka_log_size
: The number of messages sitting in Kafka's log storage for the Consumer's partitions. This number does include messages that have already been pulled and processed, so its mostly useful to ensure your broker message expiration parameters are working as you expect.Note that the Consumer
stats report only on their slice of the topic, as configured via the num_workers
and worker_slot
options. That is, if you configure multiple Consumers to split the work on a topic, you'll want to sum/aggregate their statistics in your analytics backend to get a full picture of topic processing.
The full librdafka JSON statistics object is returned as rdkafka_stats
, in case you want to parse out further stats.
Although Kafka can support binary keys and payloads, the consumer will try to stringify any payload received, which won't be useful if you're pushing binary data into a topic. There's currently no way to produce key/payload pairs; the Producer's send call only takes an array of payloads at this time.
The librdkafka version in use doesn't currently support any interaction with Zookeeper, nor is its broker offset commit functionality useful. This is why the Consumer
manages its offsets locally.
var producer = new Producer(options);
options:
stats_callback(info)
info:
delivery_report_callback(info)
info:
producer.send(topic, partitions, payloads)
producer.partition_count(topic)
producer.stop(options)
var consumer = new Consumer(options);
options:
console
.receive_callback(info)
info:
stats_callback(info)
info:
waiting_kafka
: The number of messages that are sitting at the Kafka broker waiting to be pulled.waiting_local
: The number of messages that have been pulled from kafka, and are sitting in local memory, but have not yet been handed to the caller's receive_callback
.kafka_log_size
: The number of messages sitting in Kafka's log storage for the Consumer's partitions. This number does include messages that have already been pulled and processed, so its mostly useful to ensure your broker message expiration parameters are working as you expect.consumer.start()
consumer.pause()
Prevents further receive_callback invocations until a consumer.resume() call.
consumer.resume()
Re-allows receive_callback invocations after using consumer.pause().
consumer.stop()
The wrapper library and addon are licensed for distribution by the MIT License.
The repository also includes a copy of the librdkafka library (from https://github.com/edenhill/librdkafka) which is licensed as specified in deps/librdkafka/LICENSE.
FAQs
node.js Kafka client
The npm package kafka-native receives a total of 1 weekly downloads. As such, kafka-native popularity was classified as not popular.
We found that kafka-native demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket’s new Tier 1 Reachability filters out up to 80% of irrelevant CVEs, so security teams can focus on the vulnerabilities that matter.
Research
/Security News
Ongoing npm supply chain attack spreads to DuckDB: multiple packages compromised with the same wallet-drainer malware.
Security News
The MCP Steering Committee has launched the official MCP Registry in preview, a central hub for discovering and publishing MCP servers.