Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

queue-bus

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

queue-bus

  • 0.13.3
  • Rubygems
  • Socket score

Version published
Maintainers
1
Created
Source

Queue Bus

This gem uses Redis and background queues that you are already using to allow simple asynchronous communication between apps.

Install

To install, pick one of the adapters and add it to your Gemfile:

  • resque-bus
  • sidekiq-bus

And add the appropriate tasks to your Rakefile:

require "resque_bus/tasks" # or sidekiq_bus/tasks

Example

Application A can publish an event

# pick an adapter
require 'resque-bus' # (or other adapter)

# business logic
QueueBus.publish("user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith")

# or do it later
QueueBus.publish_at(1.hour.from_now, "user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith")

Application B is subscribed to events

# pick an adapter
require 'resque-bus' # (or other adapter)

# initializer
QueueBus.dispatch("app_b") do
  # processes event on app_b_default queue
  # subscribe is short-hand to subscribe to your 'default' queue and this block with process events with the name "user_created"
  subscribe "user_created" do |attributes|
    NameCount.find_or_create_by_name(attributes["last_name"]).increment!
  end

  # processes event on app_b_critical queue
  # critical is short-hand to subscribe to your 'critical' queue and this block with process events with the name "user_paid"
  critical "user_paid" do |attributes|
    CreditCard.charge!(attributes)
  end

  # you can pass any queue name you would like to process from as well IE: `banana "peeled" do |attributes|`

  # and regexes work as well. note that with the above configuration along with this regex,
  # the following as well as the corresponding block above would both be executed
  subscribe /^user_/ do |attributes|
    Metrics.record_user_action(attributes["bus_event_type"], attributes["id"])
  end

  # the above all filter on just the event_type, but you can filter on anything
  # this would be _any_ event that has a user_id and the page value of homepage regardless of bus_event_type
  subscribe "my_key", { "user_id" => :present, "page" => "homepage"} do
    Mixpanel.homepage_action!(attributes["action"])
  end

  # You may also declare a subscription to heartbeat events. This is a helper function
  # that works along with subscribe to make scheduling regular events easier.
  #
  #   minute_interval: Executes every n minutes
  #   hour_interval:   Executes every n hours
  #   minute:          Executes on this minute
  #   hour:            Executes on this hour
  on_heartbeat "my_heartbeat_event", minute_interval: 5 do |attributes|
  end
end

Applications can also subscribe within classes using the provided Subscriber module.

class SimpleSubscriber
  include QueueBus::Subscriber
  subscribe :my_method

  def my_method(attributes)
    # heavy lifting
  end
end

The following is equivalent to the original initializer and shows more options:

class OtherSubscriber
  include QueueBus::Subscriber
  application :app_b

  subscribe :user_created
  subscribe_queue :app_b_critical, :user_paid
  subscribe_queue :app_b_default, :user_action, :bus_event_type => /^user_/
  subscribe :homepage_method, :user_id => :present, :page => "homepage"

  def user_created(attributes)
    NameCount.find_or_create_by_name(attributes["last_name"]).increment!
  end

  def user_paid(attributes)
    CreditCard.charge!(attributes)
  end

  def user_action(attributes)
    Metrics.record_user_action(attributes["bus_event_type"], attributes["id"])
  end

  def homepage_method
    Mixpanel.homepage_action!(attributes["action"])
  end
end

Note: This subscribes when this class is loaded, so it needs to be in your load or otherwise referenced/required during app initialization to work properly.

Commands

Each app needs to tell Redis about its subscriptions:

$ rake queuebus:subscribe

See the adapter project for detils on running the workers.

Local Mode

For development, a local mode is provided and is specified in the configuration.

# config
QueueBus.local_mode = :standalone
or
QueueBus.local_mode = :inline

Standalone mode does not require a separate resquebus:driver task to be running to process the incoming queue. Simply publishing to the bus will distribute the incoming events to the appropriate application specific queue. A separate resquebus:work task does still need to be run to process these events

Inline mode skips queue processing entirely and directly dispatches the event to the appropriate code block.

You can also say QueueBus.local_mode = :suppress to turn off publishing altogether. This can be helpful inside some sort of migration, for example.

Thread Safe Options

!! This is important if you are using workers that utilize multiple threads, such as Sidekiq !!

The above setting is global to the ruby process and modifying it will impact all threads that are currently using QueueBus. If you want to isolate a thread or block of code from QueueBus, you can use the methods with_local_mode or in_context:

With local mode

QueueBus.with_local_mode(:suppress) do
  # QueueBus will be suppressed on this thread, within this block.
end

In context

QueueBus.in_context('some_context') do
  # Context attribute will be set for all events published within this scope.
end

The previous values will be restored after the block exits.

TODO

  • Replace local modes with adapters
  • There are a few spots in the code with TODO notes
  • Make this not freak out in development without Redis or when Redis is down
  • We might not actually need to publish in tests
  • Add some rspec helpers for the apps to use: should_ post an event_publish or something along those lines

FAQs

Package last updated on 26 Jul 2024

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc