
Security News
AGENTS.md Gains Traction as an Open Format for AI Coding Agents
AGENTS.md is a fast-growing open format giving AI coding agents a shared, predictable way to understand project setup, style, and workflows.
kafka-node-topic-consumer
Advanced tools
error handling and concurrency handling for kafka-node HighLevelConsumer (0.8)
wrapper around kafka-node's HighLevelConsumer
that provides error handling and message processing concurrency control via fastq (a lightweight fast queue modeled after async
's queue).
npm install --save kafka-node kafka-node-topic-consumer
There are two main motivations for this module:
TopicConsumer
will self heal when an error is encountered by the underlying HighLevelConsumer by first attempting to close the existing consumer before removing it and scheduling a rebuild at a random time in the near future (30-90 seconds). The rebuild process is infinite, in that if it fails, it will restart the healing process.kafka-node
's HighLevelConsumer' resembles a sort of firehose, emitting messages as soon as they arrive, regardless of how fast the application is able to process them. To control this issue, the TopicConsumer implements an in memory queue which processes a single batch of messages at a time. As soon as the underlying consumer emits the first message of a newly received batch, it pauses the consumer and pushes all messages into the queue. Once the last message has been processed, it resumes consuming messages.import TopicConsumer from 'kafka-node-topic-consumer';
// create a new TopicConsumer
const consumer = new TopicConsumer({
host: process.env.ZOOKEEPER_HOST,
consumer: { groupId: 'my-consumer-group' },
topic: 'my-topic',
});
consumer.registerWorker((msg) => {
console.log(msg);
return Promise.resolve();
});
consumer.on('message:error', (err, msg) => {
console.error(err, msg);
});
consumer.connect()
instantiate a new topic consumer
name | type | description |
---|---|---|
options | Object | constructor options |
[options.concurrency] | Number | number of tasks to be processed at any given time, default is 1 |
options.consumer | Object | consumer options |
options.consumer.groupId | String | consumer group id |
options.host | String | zookeeper connection string |
[options.parse] | Function | a function (raw) => Promise for parsing raw kafka messages before they are pushed into the queue. the default parse function will attempt to parse the raw message's value attribute as utf-8 stringified json and add it as the parsedValue attribute on the message |
[options.rebuild] | Object | rebuild configuration |
[options.rebuild.closing] | Object | valid retry options for closing failed consumers |
[options.rebuild.maxDelay] | Number, String | the maximum time to wait before rebuilding, default is 2m |
[options.rebuild.minDelay] | Number, String | the minimum time to wait before rebuilding, default is 35s |
options.topic | String, Object | topic name or payload |
[options.validate] | Function | a function (parsed) => Promise for validating queue messages. Messages that fail validation will not be processed by workers |
import Bluebird from 'bluebird';
import joi from 'joi';
import TopicConsumer from 'kafka-node-topic-consumer';
const consumer = new TopicConsumer({
host: process.env.ZOOKEEPER_HOST,
consumer: {
groupId: 'my-group-id'
},
topic: 'my-topic',
parse(raw) {
return Bluebird.try(() => {
return JSON.parse(raw.value.toString('utf8'));
});
},
validate(parsed) {
const schema = joi.object({
id: joi.string().guid().required(),
action: joi.string().valid('create', 'destroy', 'update').required(),
data: joi.object().required(),
});
const result = joi.validate(parsed, schema);
if (result.error) {
return Promise.reject(result.error);
}
return Promise.resolve(result.value);
},
});
Wait for a new consumer to register
name | type | description |
---|---|---|
done | Function | optional callback |
consumer.connect(err => {});
consumer.connect()
.then(() => {})
.catch(err => {});
the underlying HighLevelConsumer instance
the underlying queue instance
get current status
{
"consumer": {
"groupId": "my-consumer-group",
"initialized": false,
"ready": true,
"closing": false,
"paused": false,
"rebalancing": false,
"topicPayloads": [
{
"topic": "my-topic",
"partition": "6",
"offset": 39,
"maxBytes": 1048576,
"metadata": "m"
},
{
"topic": "my-topic",
"partition": "7",
"offset": 19,
"maxBytes": 1048576,
"metadata": "m"
},
{
"topic": "my-topic",
"partition": "8",
"offset": 16,
"maxBytes": 1048576,
"metadata": "m"
},
{
"topic": "my-topic",
"partition": "9",
"offset": 28,
"maxBytes": 1048576,
"metadata": "m"
},
{
"topic": "my-topic",
"partition": "10",
"offset": 14,
"maxBytes": 1048576,
"metadata": "m"
},
{
"topic": "my-topic",
"partition": "11",
"offset": 33,
"maxBytes": 1048576,
"metadata": "m"
}
]
},
"queue": {
"idle": true,
"length": 0
},
"status": "up"
}
register a new worker function
name | type | description |
---|---|---|
worker | Function | a function worker(parsed) => Promise that is passed every (valid) message for processing |
consumer.registerWorker(parsed => {
return Promise.resolve();
});
the TopicConsumer extends from the EventEmitter class and emits the following lifecycle events:
event | description |
---|---|
consumer:closing-error | fired (err) when all attempts to close a failed consumer have failed |
consumer:commit-error | fired (err) when an error is encountered commiting offsets |
consumer:connecting | fired when a new consumer instance is waiting to connect/register |
consumer:error | fired (err) anytime the underlying consumer emits an error |
consumer:offset-out-of-range | fired when underlying consumer encounters an OffsetOutOfRangeError |
consumer:pausing | fired when first message is pushed into queue and underlying consumer is paused |
consumer:rebuild-initiated | fired when the rebuild process has been initiated |
consumer:rebuild-scheduled | fired (delayInSeconds) when the rebuild has been scheduled |
consumer:rebuild-started | fired when the rebuild has started |
consumer:resuming | fired when last task in queue has been processed and underlying consumer is resuming |
consumer:starting | fired after a new consumer has registered and is beginning to fetch messages |
message:processing | fired (parsed) when the queue has started processing a message |
message:skipped | fired (parsed, reason) when a message fails validation |
message:success | fired (parsed, results) when a message has been successfully processed |
message:error | fired (err, parsed) when a worker rejects |
Requires docker 1.8+ and docker-compose 1.12+
docker-compose up
git checkout -b my-new-feature
)git commit -am 'Add some feature'
)git push origin my-new-feature
)Copyright (c) 2016 Gaia
Licensed under the MIT license
FAQs
error handling and concurrency handling for kafka-node HighLevelConsumer (0.8)
The npm package kafka-node-topic-consumer receives a total of 0 weekly downloads. As such, kafka-node-topic-consumer popularity was classified as not popular.
We found that kafka-node-topic-consumer demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
AGENTS.md is a fast-growing open format giving AI coding agents a shared, predictable way to understand project setup, style, and workflows.
Security News
/Research
Malicious npm package impersonates Nodemailer and drains wallets by hijacking crypto transactions across multiple blockchains.
Security News
This episode explores the hard problem of reachability analysis, from static analysis limits to handling dynamic languages and massive dependency trees.