
Sbmt-Outbox
Microservices often publish messages after a transaction has been committed. Writing to the database and publishing a message are two separate transactions, so they must be atomic. A failed publication of a message could lead to a critical failure of the business process.
The Outbox pattern provides a reliable solution for message publishing. The idea behind this approach is to have an "outgoing message table" in the service's database. Before the main transaction completes, a new message row is added to this table. As a result, two actions take place as part of a single transaction. An asynchronous process retrieves new rows from the database table and, if they exist, publishes the messages to the broker.
Installation
Add this line to your application's Gemfile:
gem "sbmt-outbox"
And then execute:
bundle install
Demo
Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/SberMarket-Tech/outbox-example-apps
Auto configuration
We recommend going through the configuration and files creation process using the following Rails generators:
Each generator can be run by using the --help
option to learn more about the available arguments.
Initial configuration
If you plug the gem into your application for the first time, you can generate the initial configuration:
rails g outbox:install
Outbox/inbox items creation
An ActiveRecord model can be generated for the outbox/inbox item like this:
rails g outbox:item MaybeNamespaced::SomeOutboxItem --kind outbox
rails g outbox:item MaybeNamespaced::SomeInboxItem --kind inbox
As the result, a migration and a model will be created and the outbox.yml
file configured.
Transport creation
A transport is a class that is invoked while processing a specific outbox or inbox item. The transport must return either a boolean value or a dry monad result.
rails g outbox:transport MaybeNamespaced::SomeOutboxItem some/transport/name --kind outbox
rails g outbox:transport MaybeNamespaced::SomeInboxItem some/transport/name --kind inbox
Usage
To create an Outbox item, you should call the Interactor with the Item Model Class and event_key
as arguments. The latter will be the Partitioning Key.
transaction do
some_record.save!
result = Sbmt::Outbox::CreateOutboxItem.call(
MyOutboxItem,
event_key: some_record.id,
attributes: {
payload: some_record.generate_payload,
options: {
key: some_record.id,
headers: {'FOO_BAR' => 'baz'}
}
}
)
raise result.failure unless result.success?
end
To create multiple Outbox items in batch, you should call the Interactor with the Item Model Class and batch attributes, each item should have same list of keys. Each item should have event_key
element, it will be the Partitioning Key.
transaction do
some_record.save!
another_record.save!
result = Sbmt::Outbox::CreateOutboxBatch.call(
MyOutboxItem,
batch_attributes: [
{
event_key: some_record.id,
payload: some_record.generate_payload,
options: {
key: some_record.id,
headers: {'FOO_BAR' => 'baz'}
}
},
{
event_key: another_record.id,
payload: another_record.generate_payload,
options: {
key: another_record.id,
headers: {'FOO_BAR' => 'baz'}
}
}
]
)
raise result.failure unless result.success?
end
Monitoring
We use Yabeda to collect all kind of metrics.
Example of a Grafana dashboard that you can import from a file:

Full picture
Manual configuration
Outboxfile
First of all you should create an Outboxfile
at the root of your application with the following code:
require_relative "config/environment"
Yabeda::Prometheus::Exporter.start_metrics_server!
config/initializers/outbox.rb
The config/initializers/outbox.rb
file contains the overall general configuration.
Rails.application.config.outbox.tap do |config|
config.redis = {url: ENV.fetch("REDIS_URL")}
config.paths << Rails.root.join("config/outbox.yml").to_s
config.poller = ActiveSupport::OrderedOptions.new.tap do |pc|
pc.concurrency = 6
pc.threads_count = 1
pc.general_timeout = 60
pc.regular_items_batch_size = 200
pc.retryable_items_batch_size = 100
pc.tactic = "default"
pc.rate_limit = 60
pc.rate_interval = 60
pc.min_queue_size = 10
pc.max_queue_size = 100
pc.min_queue_timelag = 5
pc.queue_delay = 0.1
end
config.processor = ActiveSupport::OrderedOptions.new.tap do |pc|
pc.threads_count = 4
pc.general_timeout = 120
pc.brpop_delay = 2
end
config.process_items.tap do |x|
x.general_timeout = 180
x.cutoff_timeout = 60
x.batch_size = 200
end
config.worker.tap do |worker|
worker.rate_limit = 10
worker.rate_interval = 60
end
end
Outbox pattern
You should create a database table in order for the process to view your outgoing messages.
create_table :my_outbox_items do |t|
t.uuid :uuid, null: false
t.string :event_name, null: false
t.string :event_key, null: false
t.integer :bucket, null: false
t.integer :status, null: false, default: 0
t.jsonb :options
t.binary :payload, null: false
t.integer :errors_count, null: false, default: 0
t.text :error_log
t.timestamp :processed_at
t.timestamps
end
add_index :my_outbox_items, :uuid, unique: true
add_index :my_outbox_items, [:status, :id, :bucket], algorithm: :concurrently, include: [:errors_count]
add_index :my_outbox_items, [:event_name, :event_key, :id]
add_index :my_outbox_items, :created_at
You can combine various types of messages within a single table. To do this, you should include an event_name
field in the table. However, this approach is only justified if it is assumed that there won't be many events, and those events will follow the same retention and retry policy.
class MyOutboxItem < Sbmt::Outbox::OutboxItem
validates :event_name, presence: true
end
outbox.yml
The outbox.yml
configuration file is the main configuration for the gem, where parameters for each outbox/inbox item are located.
default: &default
owner: foo-team
bucket_size: 16
metrics:
enabled: true
port: 9090
probes:
enabled: false
port: 5555
outbox_items:
my_outbox_item:
owner: my_outbox_item_team
retention: P1W
min_retention_period: P1D
retention_delivered_items: PT6H
delivered_min_retention_period: PT1H
deletion_batch_size: 1_000
deletion_sleep_time: 0.5
deletion_time_window: PT4H
max_retries: 3
strict_order: false
transports:
produce_message:
class: produce_message
disposable: false
topic: "my-topic-name"
development:
<<: *default
test:
<<: *default
bucket_size: 2
production:
<<: *default
bucket_size: 256
CAUTION:
- ⚠️ If this option is enabled and an error occurs while processing a message in a bucket,
subsequent messages in that bucket won't be processed until the current message is either skipped or successfully processed
- ⚠️ Cannot use
retry_strategies
and the strict_order
option at the same time
class ProduceMessage
def initialize(topic:)
@topic = topic
end
def call(outbox_item, payload)
true
end
end
If you use Kafka as a transport, it is recommended that you use the sbmt-kafka_producer
gem for this purpose.
Transports are defined as follows when event_name
is used:
outbox_items:
my_outbox_item:
transports:
- class: produce_message
event_name: "order_created"
topic: "order_created_topic"
- class: produce_message
event_name: "orders_completed"
topic: "orders_completed_topic"
Inbox pattern
The database migration will be the same as described in the Outbox pattern.
class MyInboxItem < Sbmt::Outbox::InboxItem
end
inbox_items:
my_inbox_item:
owner: my_inbox_item_team
retention: P1W
min_retention_period: P1D
retention_delivered_items: PT6H
delivered_min_retention_period: PT1H
deletion_batch_size: 1_000
deletion_sleep_time: 0.5
deletion_time_window: PT4H
max_retries: 3
transports:
import_order:
source: "kafka"
class ImportOrder
def initialize(source:)
@source = source
end
def call(outbox_item, payload)
true
end
end
If you use Kafka, it is recommended that you use the sbmt-kafka_consumer
gem for this purpose.
Retry strategies
The gem uses several types of retry strategies to repeat message processing if an error occurs. These strategies can be combined and will be executed one after the other. Each retry strategy takes one of three actions: to process the message, to skip processing the message or to skip processing and mark the message as "skipped" for future processing.
Exponential backoff
This strategy periodically attempts to resend failed messages, with increasing delays in between each attempt.
outbox_items:
my_outbox_item:
...
minimal_retry_interval: 10
maximal_retry_interval: 600
multiplier_retry_interval: 2
retry_strategies:
- exponential_backoff
Latest available
This strategy ensures idempotency. In short, if a message fails and a later message with the same event_key has already been delivered, then you most likely do not want to re-deliver the first one when it is retried.
outbox_items:
my_outbox_item:
...
retry_strategies:
- exponential_backoff
- latest_available
The exponential backoff strategy should be used in conjunction with the latest available strategy, and it should come last to minimize the number of database queries.
Partition strategies
Depending on which type of data is used in the event_key
, it is necessary to choose the right partitioning strategy.
Number partitioning
This strategy should be used when the event_key
field contains a number. For example, it could be 52523
, or some-chars-123
. Any characters that aren't numbers will be removed, and only the numbers will remain. This strategy is used as a default.
outbox_items:
my_outbox_item:
...
partition_strategy: number
Hash partitioning
This strategy should be used when the event_key
is a string or uuid.
outbox_items:
my_outbox_item:
...
partition_strategy: hash
Rake tasks
rake outbox:delete_items
rake outbox:update_status_items
Example run:
rake outbox:delete_items[OutboxItem,1] # Mandatory parameters box class and status
rake outbox:update_status_items[OutboxItem,0,3] # Mandatory parameters box class, current status and new status
Both tasks have optional parameters:
- start_time
- end_time
- batch_size
- sleep_time
Example with optional parameters:
- format optional parameters:
rake outbox:delete_items[klass_name,status,start_time,end_time,batch_size,sleep_time]
rake outbox:update_status_items[klass_name,status,new_status,start_time,end_time,batch_size,sleep_time]
rake outbox:delete_items[OutboxItem,1,"2025-01-05T23:59:59","2025-01-05T00:00:00",10_000,5]
rake outbox:update_status_items[OutboxItem,0,3,"2025-01-05T23:59:59","2025-01-05T00:00:00",10_000,5]
Concurrency
The worker process consists of a poller and a processor, each of which has its own thread pool.
The poller is responsible for fetching messages ready for processing from the database table.
The processor, in turn, is used for their consistent processing (while preserving the order of messages and the partitioning key).
Each bunch of buckets (i.e. buckets partition) is consistently fetched by poller one at a time. Each bucket is processed one at a time by a processor.
A bucket is a number in a row in the bucket
column generated by the partitioning strategy based on the event_key
column when a message was committed to the database within the range of zero to bucket_size
.
The number of bucket partitions, which poller uses is 6 by default. The number of poller threads is 2 by default and is not intended for customization.
The default number of processor threads is 4 and can be configured with the --concurrency option, thereby allowing you to customize message processing performance.
This architecture was designed to allow the daemons to scale without stopping the entire system in order to avoid mixing messages chronologically.
Middlewares
You can wrap item processing within middlewares. There are three types:
- client middlewares – triggered outside of a daemon; executed alongside an item is created
- server middlewares – triggered inside a daemon; divided into two types:
- batch middlewares – executed alongside a batch being fetched from the database
- item middlewares – execute alongside an item during processing
- polling middlewares - execute with element during pooling
The order of execution depends on the order specified in the outbox configuration:
Rails.application.config.outbox.tap do |config|
config.item_process_middlewares.push(
'MyFirstItemMiddleware',
'MySecondItemMiddleware'
)
end
Client middlewares
Rails.application.config.outbox.tap do |config|
config.create_item_middlewares.push(
'MyCreateItemMiddleware'
)
config.create_batch_middlewares.push(
'MyCreateBatchMiddleware'
)
end
class MyCreateItemMiddleware
def call(item_class, item_attributes)
yield
end
end
class MyCreateBatchMiddleware
def call(item_class, batch_attributes)
yield
end
end
Server middlewares
Example of a batch middleware:
Rails.application.config.outbox.tap do |config|
config.batch_process_middlewares.push(
'MyBatchMiddleware'
)
end
class MyBatchMiddleware
def call(job)
yield
end
end
Example of an item middleware:
Rails.application.config.outbox.tap do |config|
config.item_process_middlewares.push(
'MyItemMiddleware'
)
end
class MyItemMiddleware
def call(item)
yield
end
end
Example of an polling middleware:
Rails.application.config.outbox.tap do |config|
config.polling_item_middlewares.push(
'MyItemMiddleware'
)
end
class MyPollingItemMiddleware
def call(item)
yield
end
end
The gem is optionally integrated with OpenTelemetry. If your main application has `opentelemetry-*` gems, the tracing will be configured automatically.
Outbox comes with a [Ract web application](https://github.com/SberMarket-Tech/sbmt-outbox-ui) that can list existing outbox and inbox models.
```ruby
Rails.application.routes.draw do
mount Sbmt::Outbox::Engine => "/outbox-ui"
end
The path /outbox-ui
cannot be changed for now
Under the hood it uses a React application provided as npm package.
By default, the npm packages is served from https://cdn.jsdelivr.net/npm/sbmt-outbox-ui@x.y.z/dist/assets/index.js
. It could be changed by the following config option:
Rails.application.config.outbox.tap do |config|
config.cdn_url = "https://some-cdn-url"
end
UI development
If you want to implement some features for Outbox UI, you can serve javascript locally like the following:
- Start React application by
npm run dev
- Configure Outbox to serve UI scripts locally:
Rails.application.config.outbox.tap do |config|
config.ui.serve_local = true
end
We would like to see more features added to the web UI. If you have any suggestions, please feel free to submit a pull request 🤗.
CLI Arguments (v2: default)
Key | Description |
---|
--boxes or -b | Outbox/Inbox processors to start` |
--concurrency or -c | Number of process threads. Default 4. |
--poll-concurrency or -p | Number of poller partitions. Default 6. |
--poll-threads or -n | Number of poll threads. Default 1. |
--poll-tactic or -t | Poll tactic. Default "default". |
--worker-version or -w | Worker version. Default 2. |
CLI Arguments (v1: DEPRECATED)
Key | Description |
---|
--boxes or -b | Outbox/Inbox processors to start` |
--concurrency or -c | Number of threads. Default 10. |
Development & Test
Installation
- Install Dip
- Run
dip provision
Usage
- Run
dip setup
- Run
dip test
See more commands at dip.yml.