Socket
Book a DemoInstallSign in
Socket

kafka-node-topic-consumer

Package Overview
Dependencies
Maintainers
1
Versions
11
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-node-topic-consumer

error handling and concurrency handling for kafka-node HighLevelConsumer (0.8)

0.3.5
latest
Source
npmnpm
Version published
Weekly downloads
0
Maintainers
1
Weekly downloads
 
Created
Source

kafka-node-topic-consumer

wrapper around kafka-node's HighLevelConsumer that provides error handling and message processing concurrency control via fastq (a lightweight fast queue modeled after async's queue).

Installing

npm install --save kafka-node kafka-node-topic-consumer

Purpose

There are two main motivations for this module:

  • There are known issues with the high level consumer api in kafka 0.8. Often when starting consumers too quickly after a failure or too near in time to another member of the same group, rebalancing issues are experienced. To help alleviate these issues, the TopicConsumer will self heal when an error is encountered by the underlying HighLevelConsumer by first attempting to close the existing consumer before removing it and scheduling a rebuild at a random time in the near future (30-90 seconds). The rebuild process is infinite, in that if it fails, it will restart the healing process.
  • Although kafka guarantees ordering within a partition, kafka-node's HighLevelConsumer' resembles a sort of firehose, emitting messages as soon as they arrive, regardless of how fast the application is able to process them. To control this issue, the TopicConsumer implements an in memory queue which processes a single batch of messages at a time. As soon as the underlying consumer emits the first message of a newly received batch, it pauses the consumer and pushes all messages into the queue. Once the last message has been processed, it resumes consuming messages.

Getting Started

import TopicConsumer from 'kafka-node-topic-consumer';

// create a new TopicConsumer
const consumer = new TopicConsumer({
  host: process.env.ZOOKEEPER_HOST,
  consumer: { groupId: 'my-consumer-group' },
  topic: 'my-topic',
});

consumer.registerWorker((msg) => {
  console.log(msg);
  return Promise.resolve();
});

consumer.on('message:error', (err, msg) => {
  console.error(err, msg);
});

consumer.connect()

API

constructor(options) => TopicConsumer

instantiate a new topic consumer

Params
nametypedescription
optionsObjectconstructor options
[options.concurrency]Numbernumber of tasks to be processed at any given time, default is 1
options.consumerObjectconsumer options
options.consumer.groupIdStringconsumer group id
options.hostStringzookeeper connection string
[options.parse]Functiona function (raw) => Promise for parsing raw kafka messages before they are pushed into the queue. the default parse function will attempt to parse the raw message's value attribute as utf-8 stringified json and add it as the parsedValue attribute on the message
[options.rebuild]Objectrebuild configuration
[options.rebuild.closing]Objectvalid retry options for closing failed consumers
[options.rebuild.maxDelay]Number, Stringthe maximum time to wait before rebuilding, default is 2m
[options.rebuild.minDelay]Number, Stringthe minimum time to wait before rebuilding, default is 35s
options.topicString, Objecttopic name or payload
[options.validate]Functiona function (parsed) => Promise for validating queue messages. Messages that fail validation will not be processed by workers
Example
import Bluebird from 'bluebird';
import joi from 'joi';
import TopicConsumer from 'kafka-node-topic-consumer';

const consumer = new TopicConsumer({
  host: process.env.ZOOKEEPER_HOST,
  consumer: {
    groupId: 'my-group-id'
  },
  topic: 'my-topic',
  parse(raw) {
    return Bluebird.try(() => {
      return JSON.parse(raw.value.toString('utf8'));
    });
  },
  validate(parsed) {
    const schema = joi.object({
      id: joi.string().guid().required(),
      action: joi.string().valid('create', 'destroy', 'update').required(),
      data: joi.object().required(),
    });
    const result = joi.validate(parsed, schema);
    if (result.error) {
      return Promise.reject(result.error);
    }
    return Promise.resolve(result.value);
  },
});

connect([done]) => Promise

Wait for a new consumer to register

Params
nametypedescription
doneFunctionoptional callback
Example
consumer.connect(err => {});

consumer.connect()
.then(() => {})
.catch(err => {});

consumer

the underlying HighLevelConsumer instance

queue

the underlying queue instance

getStatus() => Object

get current status

Returns
{
  "consumer": {
    "groupId": "my-consumer-group",
    "initialized": false,
    "ready": true,
    "closing": false,
    "paused": false,
    "rebalancing": false,
    "topicPayloads": [
      {
        "topic": "my-topic",
        "partition": "6",
        "offset": 39,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "7",
        "offset": 19,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "8",
        "offset": 16,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "9",
        "offset": 28,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "10",
        "offset": 14,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "11",
        "offset": 33,
        "maxBytes": 1048576,
        "metadata": "m"
      }
    ]
  },
  "queue": {
    "idle": true,
    "length": 0
  },
  "status": "up"
}

registerWorker(worker)

register a new worker function

Params
nametypedescription
workerFunctiona function worker(parsed) => Promise that is passed every (valid) message for processing
Example
consumer.registerWorker(parsed => {
  return Promise.resolve();
});

Events

the TopicConsumer extends from the EventEmitter class and emits the following lifecycle events:

eventdescription
consumer:closing-errorfired (err) when all attempts to close a failed consumer have failed
consumer:commit-errorfired (err) when an error is encountered commiting offsets
consumer:connectingfired when a new consumer instance is waiting to connect/register
consumer:errorfired (err) anytime the underlying consumer emits an error
consumer:offset-out-of-rangefired when underlying consumer encounters an OffsetOutOfRangeError
consumer:pausingfired when first message is pushed into queue and underlying consumer is paused
consumer:rebuild-initiatedfired when the rebuild process has been initiated
consumer:rebuild-scheduledfired (delayInSeconds) when the rebuild has been scheduled
consumer:rebuild-startedfired when the rebuild has started
consumer:resumingfired when last task in queue has been processed and underlying consumer is resuming
consumer:startingfired after a new consumer has registered and is beginning to fetch messages
message:processingfired (parsed) when the queue has started processing a message
message:skippedfired (parsed, reason) when a message fails validation
message:successfired (parsed, results) when a message has been successfully processed
message:errorfired (err, parsed) when a worker rejects

Testing

Requires docker 1.8+ and docker-compose 1.12+

docker-compose up

Contributing

  • Fork it
  • Create your feature branch (git checkout -b my-new-feature)
  • Commit your changes (git commit -am 'Add some feature')
  • Push to the branch (git push origin my-new-feature)
  • Create new Pull Request

License

Copyright (c) 2016 Gaia

Licensed under the MIT license

Keywords

kafka

FAQs

Package last updated on 26 Oct 2016

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

About

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.

  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc

U.S. Patent No. 12,346,443 & 12,314,394. Other pending.