Queue Bus
This gem uses Redis and background queues that you are already using to allow simple asynchronous communication between apps.
Install
To install, pick one of the adapters and add it to your Gemfile:
And add the appropriate tasks to your Rakefile:
require "resque_bus/tasks"
Example
Application A can publish an event
require 'resque-bus'
QueueBus.publish("user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith")
QueueBus.publish_at(1.hour.from_now, "user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith")
Application B is subscribed to events
require 'resque-bus'
QueueBus.dispatch("app_b") do
subscribe "user_created" do |attributes|
NameCount.find_or_create_by_name(attributes["last_name"]).increment!
end
critical "user_paid" do |attributes|
CreditCard.charge!(attributes)
end
subscribe /^user_/ do |attributes|
Metrics.record_user_action(attributes["bus_event_type"], attributes["id"])
end
subscribe "my_key", { "user_id" => :present, "page" => "homepage"} do
Mixpanel.homepage_action!(attributes["action"])
end
on_heartbeat "my_heartbeat_event", minute_interval: 5 do |attributes|
end
end
Applications can also subscribe within classes using the provided Subscriber
module.
class SimpleSubscriber
include QueueBus::Subscriber
subscribe :my_method
def my_method(attributes)
end
end
The following is equivalent to the original initializer and shows more options:
class OtherSubscriber
include QueueBus::Subscriber
application :app_b
subscribe :user_created
subscribe_queue :app_b_critical, :user_paid
subscribe_queue :app_b_default, :user_action, :bus_event_type => /^user_/
subscribe :homepage_method, :user_id => :present, :page => "homepage"
def user_created(attributes)
NameCount.find_or_create_by_name(attributes["last_name"]).increment!
end
def user_paid(attributes)
CreditCard.charge!(attributes)
end
def user_action(attributes)
Metrics.record_user_action(attributes["bus_event_type"], attributes["id"])
end
def homepage_method
Mixpanel.homepage_action!(attributes["action"])
end
end
Note: This subscribes when this class is loaded, so it needs to be in your load or otherwise referenced/required during app initialization to work properly.
Commands
Each app needs to tell Redis about its subscriptions:
$ rake queuebus:subscribe
See the adapter project for detils on running the workers.
Local Mode
For development, a local mode is provided and is specified in the configuration.
QueueBus.local_mode = :standalone
or
QueueBus.local_mode = :inline
Standalone mode does not require a separate resquebus:driver task to be running to process the
incoming queue. Simply publishing to the bus will distribute the incoming events
to the appropriate application specific queue. A separate resquebus:work task does
still need to be run to process these events
Inline mode skips queue processing entirely and directly dispatches the
event to the appropriate code block.
You can also say QueueBus.local_mode = :suppress
to turn off publishing altogether.
This can be helpful inside some sort of migration, for example.
Thread Safe Options
!! This is important if you are using workers that utilize multiple threads, such as Sidekiq !!
The above setting is global to the ruby process and modifying it will impact all threads that are
currently using QueueBus. If you want to isolate a thread or block of code from QueueBus, you can
use the methods with_local_mode
or in_context
:
With local mode
QueueBus.with_local_mode(:suppress) do
end
In context
QueueBus.in_context('some_context') do
end
The previous values will be restored after the block exits.
TODO
- Replace local modes with adapters
- There are a few spots in the code with TODO notes
- Make this not freak out in development without Redis or when Redis is down
- We might not actually need to publish in tests
- Add some rspec helpers for the apps to use: should_ post an event_publish or something along those lines