
Sbmt-KafkaConsumer
This gem is used to consume Kafka messages. It is a wrapper over the Karafka gem, and is recommended for use as a transport with the sbmt-outbox gem.
Installation
Add this line to your application's Gemfile:
gem "sbmt-kafka_consumer"
And then execute:
bundle install
Demo
Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/Kuper-Tech/outbox-example-apps
Auto configuration
We recommend going through the configuration and file creation process using the following Rails generators. Each generator can be run by using the --help
option to learn more about the available arguments.
Initial configuration
If you plug the gem into your application for the first time, you can generate the initial configuration:
rails g kafka_consumer:install
As the result, the config/kafka_consumer.yml
file will be created.
Consumer class
A consumer class can be generated with the following command:
rails g kafka_consumer:consumer MaybeNamespaced::Name
Inbox consumer
To generate an Inbox consumer for use with gem sbmt-outbox, run the following command:
rails g kafka_consumer:inbox_consumer MaybeNamespaced::Name some-consumer-group some-topic
Manual configuration
The config/kafka_consumer.yml
file is a main configuration for the gem.
Example config with a full set of options:
default: &default
client_id: "my-app-consumer"
concurrency: 4
max_wait_time: 1
shutdown_timeout: 60
pause_timeout: 1
pause_max_timeout: 30
pause_with_exponential_backoff: true
partition_assignment_strategy: cooperative-sticky
auth:
kind: plaintext
kafka:
servers: "kafka:9092"
heartbeat_timeout: 5
session_timeout: 30
reconnect_timeout: 3
connect_timeout: 5
socket_timeout: 30
kafka_options:
allow.auto.create.topics: true
probes:
port: 9394
endpoints:
readiness:
enabled: true
path: "/readiness"
liveness:
enabled: true
path: "/liveness"
timeout: 15
max_error_count: 15
metrics:
port: 9090
path: "/metrics"
consumer_groups:
group_ref_id_1:
name: cg_with_single_topic
topics:
- name: topic_with_inbox_items
consumer:
klass: "Sbmt::KafkaConsumer::InboxConsumer"
init_attrs:
name: "test_items"
inbox_item: "TestInboxItem"
deserializer:
klass: "Sbmt::KafkaConsumer::Serialization::NullDeserializer"
kafka_options:
auto.offset.reset: latest
group_ref_id_2:
name: cg_with_multiple_topics
topics:
- name: topic_with_json_data
consumer:
klass: "SomeConsumer"
deserializer:
klass: "Sbmt::KafkaConsumer::Serialization::JsonDeserializer"
- name: topic_with_protobuf_data
consumer:
klass: "SomeConsumer"
deserializer:
klass: "Sbmt::KafkaConsumer::Serialization::ProtobufDeserializer"
init_attrs:
message_decoder_klass: "SomeDecoder"
skip_decoding_error: true
development:
<<: *default
test:
<<: *default
deliver: false
production:
<<: *default
auth
config section
The gem supports 2 variants: plaintext (default) and SASL-plaintext
SASL-plaintext:
auth:
kind: sasl_plaintext
sasl_username: user
sasl_password: pwd
sasl_mechanism: SCRAM-SHA-512
kafka
config section
The servers
key is required and should be in rdkafka format: without kafka://
prefix, for example: srv1:port1,srv2:port2,...
.
The kafka_config
section may contain any rdkafka option. Also, kafka_options
may be redefined for each topic.
Please note that the partition.assignment.strategy
option within kafka_options is not supported for topics; instead, use the global option partition_assignment_strategy.
consumer_groups
config section
consumer_groups:
group_id:
name: some_group_name
topics:
- name: some_topic_name
active: true
consumer:
klass: SomeConsumerClass
init_attrs:
key: value
deserializer:
klass: SomeDeserializerClass
init_attrs:
key: value
kafka_options:
auto.offset.reset: latest
consumer.init_attrs
options for BaseConsumer
skip_on_error
- optional, default false, omit consumer errors in message processing and commit the offset to Kafkamiddlewares
- optional, default [], type String, add middleware before message processing
init_attrs:
middlewares: ['SomeMiddleware']
class SomeMiddleware
def call(message)
yield if message.payload.id.to_i % 2 == 0
end
end
CAUTION:
- ⚠️
yield
is mandatory for all middleware, as it returns control to the process_message
method.
consumer.init_attrs
options for InboxConsumer
inbox_item
- required, name of the inbox item classevent_name
- optional, default nil, used when the inbox item keep several event typesskip_on_error
- optional, default false, omit consumer errors in message processing and commit the offset to Kafkamiddlewares
- optional, default [], type String, add middleware before message processing
init_attrs:
middlewares: ['SomeMiddleware']
class SomeMiddleware
def call(message)
yield if message.payload.id.to_i % 2 == 0
end
end
CAUTION:
- ⚠️
yield
is mandatory for all middleware, as it returns control to the process_message
method. - ⚠️ Doesn't work with
process_batch
.
deserializer.init_attrs
options
skip_decoding_error
— don't raise an exception when cannot deserialize the message
probes
config section
In Kubernetes, probes are mechanisms used to assess the health of your application running within a container.
probes:
port: 9394
endpoints:
liveness:
enabled: true
path: /liveness
timeout: 10
readiness:
enabled: true
path: /readiness/kafka_consumer
metrics
config section
We use Yabeda to collect all kind of metrics.
metrics:
port: 9090
path: /metrics
Kafkafile
You can create a Kafkafile
in the root of your app to configure additional settings for your needs.
Example:
require_relative "config/environment"
some-extra-configuration
Process batch
To process messages in batches, you need to add the process_batch
method in the consumer
class SomeConsumer < Sbmt::KafkaConsumer::BaseConsumer
def process_batch(messages)
end
end
CAUTION:
- ⚠️ Inbox does not support batch insertion.
- ⚠️ If you want to use this feature, you need to process the stack atomically (eg: insert it into clickhouse in one request).
CLI
Run the following command to execute a server
kafka_consumer -g some_group_id_1 -g some_group_id_2 -c 5
Where:
-g
- group
, a consumer group id, if not specified, all groups from the config will be processed-c
- concurrency
, a number of threads, default is 4
concurrency
argument
Concurrency and Multithreading.
Don't forget to properly calculate and set the size of the ActiveRecord connection pool:
- each thread will utilize one db connection from the pool
- an application can have monitoring threads which can use db connections from the pool
Also pay attention to the number of processes of the server:
number_of_processes x concurrency
for topics with high data intensity can be equal to the number of partitions of the consumed topicnumber_sof_processes x concurrency
for topics with low data intensity can be less than the number of partitions of the consumed topic
Testing
To test your consumer with Rspec, please use this shared context
for payload
require "sbmt/kafka_consumer/testing"
RSpec.describe OrderCreatedConsumer do
include_context "with sbmt karafka consumer"
it "works" do
publish_to_sbmt_karafka(payload, deserializer: deserializer)
expect { consume_with_sbmt_karafka }.to change(Order, :count).by(1)
end
end
for payloads
require "sbmt/kafka_consumer/testing"
RSpec.describe OrderCreatedConsumer do
include_context "with sbmt karafka consumer"
it "works" do
publish_to_sbmt_karafka_batch(payloads, deserializer: deserializer)
expect { consume_with_sbmt_karafka }.to change(Order, :count).by(1)
end
end
Development
- Prepare environment
dip provision
- Run tests
dip rspec
- Run linter
dip rubocop
- Run Kafka server
dip up
- Run consumer server
dip kafka-consumer