
Research
Two Malicious Rust Crates Impersonate Popular Logger to Steal Wallet Keys
Socket uncovers malicious Rust crates impersonating fast_log to steal Solana and Ethereum wallet keys from source code.
This gem is used to consume Kafka messages. It is a wrapper over the Karafka gem, and is recommended for use as a transport with the sbmt-outbox gem.
Add this line to your application's Gemfile:
gem "sbmt-kafka_consumer"
And then execute:
bundle install
Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/Kuper-Tech/outbox-example-apps
We recommend going through the configuration and file creation process using the following Rails generators. Each generator can be run by using the --help
option to learn more about the available arguments.
If you plug the gem into your application for the first time, you can generate the initial configuration:
rails g kafka_consumer:install
As the result, the config/kafka_consumer.yml
file will be created.
A consumer class can be generated with the following command:
rails g kafka_consumer:consumer MaybeNamespaced::Name
To generate an Inbox consumer for use with gem sbmt-outbox, run the following command:
rails g kafka_consumer:inbox_consumer MaybeNamespaced::Name some-consumer-group some-topic
The config/kafka_consumer.yml
file is a main configuration for the gem.
Example config with a full set of options:
default: &default
client_id: "my-app-consumer"
concurrency: 4 # max number of threads
# optional Karafka options
max_wait_time: 1
shutdown_timeout: 60
pause_timeout: 1
pause_max_timeout: 30
pause_with_exponential_backoff: true
partition_assignment_strategy: cooperative-sticky
auth:
kind: plaintext
kafka:
servers: "kafka:9092"
# optional Kafka options
heartbeat_timeout: 5
session_timeout: 30
reconnect_timeout: 3
connect_timeout: 5
socket_timeout: 30
kafka_options:
allow.auto.create.topics: true
probes: # optional section
port: 9394
endpoints:
readiness:
enabled: true
path: "/readiness"
liveness:
enabled: true
path: "/liveness"
timeout: 300
max_error_count: 15 # default 10
metrics: # optional section
port: 9090
path: "/metrics"
consumer_groups:
group_ref_id_1:
name: cg_with_single_topic
topics:
- name: topic_with_inbox_items
consumer:
klass: "Sbmt::KafkaConsumer::InboxConsumer"
init_attrs:
name: "test_items"
inbox_item: "TestInboxItem"
deserializer:
klass: "Sbmt::KafkaConsumer::Serialization::NullDeserializer"
kafka_options:
auto.offset.reset: latest
group_ref_id_2:
name: cg_with_multiple_topics
topics:
- name: topic_with_json_data
consumer:
klass: "SomeConsumer"
deserializer:
klass: "Sbmt::KafkaConsumer::Serialization::JsonDeserializer"
- name: topic_with_protobuf_data
consumer:
klass: "SomeConsumer"
deserializer:
klass: "Sbmt::KafkaConsumer::Serialization::ProtobufDeserializer"
init_attrs:
message_decoder_klass: "SomeDecoder"
skip_decoding_error: true
development:
<<: *default
test:
<<: *default
deliver: false
production:
<<: *default
auth
config sectionThe gem supports 2 variants: plaintext (default) and SASL-plaintext
SASL-plaintext:
auth:
kind: sasl_plaintext
sasl_username: user
sasl_password: pwd
sasl_mechanism: SCRAM-SHA-512
kafka
config sectionThe servers
key is required and should be in rdkafka format: without kafka://
prefix, for example: srv1:port1,srv2:port2,...
.
The kafka_config
section may contain any rdkafka option. Also, kafka_options
may be redefined for each topic.
Please note that the partition.assignment.strategy
option within kafka_options is not supported for topics; instead, use the global option partition_assignment_strategy.
consumer_groups
config sectionconsumer_groups:
# group id can be used when starting a consumer process (see CLI section below)
group_id:
name: some_group_name # required
topics:
- name: some_topic_name # required
active: true # optional, default true
consumer:
klass: SomeConsumerClass # required, a consumer class inherited from Sbmt::KafkaConsumer::BaseConsumer
init_attrs: # optional, consumer class attributes (see below)
key: value
deserializer:
klass: SomeDeserializerClass # optional, default NullDeserializer, a deserializer class inherited from Sbmt::KafkaConsumer::Serialization::NullDeserializer
init_attrs: # optional, deserializer class attributes (see below)
key: value
kafka_options: # optional, this section allows to redefine the root rdkafka options for each topic
auto.offset.reset: latest
consumer.init_attrs
options for BaseConsumer
skip_on_error
- optional, default false, omit consumer errors in message processing and commit the offset to Kafkamiddlewares
- optional, default [], type String, add middleware before message processinginit_attrs:
middlewares: ['SomeMiddleware']
class SomeMiddleware
def call(message)
yield if message.payload.id.to_i % 2 == 0
end
end
CAUTION:
yield
is mandatory for all middleware, as it returns control to the process_message
method.consumer.init_attrs
options for InboxConsumer
inbox_item
- required, name of the inbox item classevent_name
- optional, default nil, used when the inbox item keep several event typesskip_on_error
- optional, default false, omit consumer errors in message processing and commit the offset to Kafkamiddlewares
- optional, default [], type String, add middleware before message processinginit_attrs:
middlewares: ['SomeMiddleware']
class SomeMiddleware
def call(message)
yield if message.payload.id.to_i % 2 == 0
end
end
CAUTION:
yield
is mandatory for all middleware, as it returns control to the process_message
method.process_batch
.deserializer.init_attrs
optionsskip_decoding_error
— don't raise an exception when cannot deserialize the messageprobes
config sectionIn Kubernetes, probes are mechanisms used to assess the health of your application running within a container.
probes:
port: 9394 # optional, default 9394
endpoints:
liveness:
enabled: true # optional, default true
path: /liveness # optional, default "/liveness"
timeout: 10 # optional, default 10, timeout in seconds after which the group is considered dead
readiness:
enabled: true # optional, default true
path: /readiness/kafka_consumer # optional, default "/readiness/kafka_consumer"
metrics
config sectionWe use Yabeda to collect all kind of metrics.
metrics:
port: 9090 # optional, default is probes.port
path: /metrics # optional, default "/metrics"
Kafkafile
You can create a Kafkafile
in the root of your app to configure additional settings for your needs.
Example:
require_relative "config/environment"
some-extra-configuration
Process batch
To process messages in batches, you need to add the process_batch
method in the consumer
# app/consumers/some_consumer.rb
class SomeConsumer < Sbmt::KafkaConsumer::BaseConsumer
def process_batch(messages)
# some code
end
end
CAUTION:
Run the following command to execute a server
kafka_consumer -g some_group_id_1 -g some_group_id_2 -c 5
Where:
-g
- group
, a consumer group id, if not specified, all groups from the config will be processed-c
- concurrency
, a number of threads, default is 4concurrency
argumentConcurrency and Multithreading.
Don't forget to properly calculate and set the size of the ActiveRecord connection pool:
Also pay attention to the number of processes of the server:
number_of_processes x concurrency
for topics with high data intensity can be equal to the number of partitions of the consumed topicnumber_sof_processes x concurrency
for topics with low data intensity can be less than the number of partitions of the consumed topicTo test your consumer with Rspec, please use this shared context
require "sbmt/kafka_consumer/testing"
RSpec.describe OrderCreatedConsumer do
include_context "with sbmt karafka consumer"
it "works" do
publish_to_sbmt_karafka(payload, deserializer: deserializer)
expect { consume_with_sbmt_karafka }.to change(Order, :count).by(1)
end
end
require "sbmt/kafka_consumer/testing"
RSpec.describe OrderCreatedConsumer do
include_context "with sbmt karafka consumer"
it "works" do
publish_to_sbmt_karafka_batch(payloads, deserializer: deserializer)
expect { consume_with_sbmt_karafka }.to change(Order, :count).by(1)
end
end
dip provision
dip rspec
dip rubocop
dip up
dip kafka-consumer
FAQs
Unknown package
We found that sbmt-kafka_consumer demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Socket uncovers malicious Rust crates impersonating fast_log to steal Solana and Ethereum wallet keys from source code.
Research
A malicious package uses a QR code as steganography in an innovative technique.
Research
/Security News
Socket identified 80 fake candidates targeting engineering roles, including suspected North Korean operators, exposing the new reality of hiring as a security function.