
Security News
Astral Launches pyx: A Python-Native Package Registry
Astral unveils pyx, a Python-native package registry in beta, designed to speed installs, enhance security, and integrate deeply with uv.
Welcome to EventQ.
EventQ is an event service bus framework for decoupling services and application processes.
Events are raised through the EventQ client and subscribers of the event types will be broadcast the event via a persistent queue for guaranteed delivery. Existing solutions like ActiveJob work by assuming it posts directly to the queue provider. EventQ takes advantage of systems that fanout notifications. This allows a notification to have multiple subscribers of which one is a message that EventQ can directly process.
EventQ has a base layer which allows different queue implementations to be created abstracting the specific queue implementation details away from your application code. EventQ comes with two default adapters, one for AWS SNS/SQS and another for RabbitMq (Fanout/Queue).
Add this line to your application's Gemfile:
gem 'eventq'
And then execute:
$ bundle install
Or install it yourself as:
$ gem install eventq
There are two adapters built into EventQ. One supports AWS SNS/SQS and the other supports RabbitMq In order to use the appropriate adapter you simply need to require the necessary file.
AWS
require 'eventq/aws'
RabbitMq
require 'eventq/rabbitmq'
A subscription queue should be defined to receive any events raised for the subscribed event type.
Attributes
retry_delay
is too small (eg: 30ms) in order to get meaningful backoff values.Example
# Create a queue that allows retries and accepts a maximum of 5 retries with a 20 second delay between retries.
class DataChangeAddressQueue < Queue
def initialize
@name = 'Data.Change.Address'
@allow_retry = true
@retry_delay = 20_000
@max_retry_attempts = 5
end
end
Retry Strategies
In distributed systems, it is expected for some events to fail. Thankfully, those events can be put "on hold" and will be processed again after a given waiting time. The attributes affecting your retry strategy the most are:
retry_delay
(base duration that events are waiting before being reprocessed)max_receive_count
and max_retry_attempts
(limiting how often an event can be seen / processed)allow_retry
, allow_retry_back_off
and allow_exponential_back_off
(defining if retries are allowed and how duration between retries should be calculated)If only retry_delay
is set to true
, while allow_retry_back_off
and allow_exponential_back_off
remain false
, the duration between retries will be retry_delay
each time ("fixed back off").
So there is a fixed duration between events, like in the example for DataChangeAddressQueue
above.
With the configuration of that class, the event will be retried 5 times, with at least 20 seconds between retries.
Therefore we can calculate that the final retry will have happened after retry_duration * max_retry_attempts
, which results in 100 seconds here.
If also allow_retry_back_off
is set to true
, the duration between retries will scale with the number of retries ("incremental back off").
So the first retry will happen after retry_duration
, the second after 2 * retry_duration
, the third after 3 * retry_duration
and so on.
So the retries will be spread out further apart each time.
The last retry will be processed after (max_retry_attempts * (max_retry_attempts + 1))/2 * retry_duration
.
So in the example above, it would result in 300 seconds until the last retry.
If also allow_exponential_back_off
is set to true
, the duration between retries will double each time ("exponential back off").
So the first retry will happen after retry_duration
, the second after 2 * retry_duration
, the third after 4 * retry_duration
and so on.
The last retry will be processed after (2^max_retry_attempts - 1) * retry_duration
.
So in the example above, it would result in 620 seconds until the last retry.
You can run experiments on your retry configuration using plot_visibility_timeout.rb, which will output the retry duration on each retry given your settings.
Randomness
By default, there will be no randomness in your retry strategy. However, that means that with a fixed 20 second back off, many events overloading your service will all come back after exactly 20 seconds, overloading it again. Therefore it can be useful to introduce randomness to your retry duration, so the events that initially hit the queue at the same time, are spread out when scheduling them for retry.
The attribute retry_jitter_ratio
allows you to configure how much randomness ("jitter") is allowed for the retry duration.
Let's assume we have a retry_duration = 20_000
(20 seconds).
Then the retry_jitter_ratio
would have the following effect:
In the graphs below you can see how adding 50% randomness can help avoid overloading the service. In the first graph ("Fixed Retry Duration"), all failures are hitting the queue again after exactly 20 seconds. This leads to only a couple of events to succeed, as the others fail due to too many concurrent requests running into locks etc. However, in the second graph ("Randomised Retry Duration"), the events are randomnly spread out over the next 10 to 20 seconds. This means less events hit the service concurrently, allowing it to succesfully process more events and processing all of the events in a shorter duration, reducing the overall load on the service.
In order to receive events within a subscription queue it must subscribe to the type of the event it should receive.
This method is called to subscribe a queue to an event type.
Params:
Example
#create an instant of the queue definition
queue = DateChangeAddressQueue.new
#subscribe the queue definition to an event type
subscription_manager.subscribe('Data:Change:Address', queue)
This method is called to unsubscribe a queue.
Params:
Example
#create an instance of the queue definition
queue = DateChangeAddressQueue.new
#unsubscribe the queue definition
subscription_manager.unsubscribe(queue)
The queue worker is used to process subscribed events from a subscription queue. The QueueWorker uses threads and is capable of processing subscribed events in parallel.
The on_retry_exceeded method allows you to specify a block that should execute whenever an event fails to process and exceeds the maximum allowed retry attempts specified by the queue. The event object passed to the block is a [QueueMessage] object.
Example
worker.on_retry_exceeded do |event|
....
#Do something with the failed event
....
end
The on_retry method allows you to specify a block that should execute whenever an event fails to process and is retried. The event object passed to the block is a [QueueMessage] object, and the abort arg is a Boolean that specifies if the message was aborted (true or false).
[NOTE: The message will be automatically retried so no manual action is required, this is to allow additional logging etc to be performed]
Example
worker.on_retry do |event, abort|
....
#Do something with the failed event
....
end
The on_error method allows you to specify a block that should execute whenever an unhandled error occurs with the worker. The could be communication failures with the queue etc.
Example
worker.on_error do |error|
....
#Do something with the error
....
end
The start method is used to specify a block to process received events and start the worker.
Params:
Options:
:fork_count [Int] [Optional] [Default=1] This is the number of process forks that the queue worker will use to process events in parallel (Additional forks should be added to take advantage of multi core CPU's).
:thread_count [Int] [Optional] [Default=5] This is the number of threads that the queue worker should use to process events in parallel.
:sleep [Number] [Optional] [Default=15] This is the number of seconds a thread should sleep before attempting to request another event from the subscription queue when no event has been received.
:wait [Bool] [Optional] This is used to specify that the start method should block the calling thread and wait until all parallel threads have finished. (This can be used to ensure that the background process running the worker does not exit).
Block arguments:
content [Object] This is the content of the received event.
type [String] This is the type of the received event.
retry_attempts [Int] This is the number of times the received event has been retried.
Example
#start the queue worker
worker.start(queue, {:thread_count => 8, :sleep => 30 }) do |content,type,retry_attempts|
....
#add event processing code here
....
end
This method is called to stop the QueueWorker and all threads.
Note: This is only available when the :wait option has not been specified for the #start method.
Example
#stop the worker
worker.stop
The [QueueMessage] is used internally to represent an event within the various queues. It is also returned as a parameter to the #on_retry_exceeded block of a [QueueWorker].
Attributes:
The EventQ::Configuration
class allows global configuration options to be specified.
This is used to specify the serialization provider that should be used for event serialization & deserialization.
Options:
- OJ_PROVIDER [Default] This is a serialization provider that uses the 'oj' gem to handle serialization & deserialization.
- JSON_PROVIDER This is a serialization provider that uses the 'json' gem to handle serialization & deserialization.
#set the serialization provider configuration to the OJ_PROVIDER
EventQ::Configuration.serialization_provider = EventQ::SerializationProviders::OJ_PROVIDER
..
#set the serialization provider configuration to the JSON_PROVIDER
EventQ::Configuration.serialization_provider = EventQ::SerializationProviders::JSON_PROVIDER
This is used to specify the signature provider that should be used for message signing.
Options:
- SHA256 [Default] This is provider uses SHA256 to create message signatures.
This is used to specify the signature secret that should be used for message signing.
#set the signature secret
EventQ::Configuration.signature_secret = 'secret key'
The NonceManager is used to prevent duplicate messages from being processed. Each event message that is raised is given a unique identifier, most message queue providers guarantee at least once delivery which may result in the message being delivered more than once. If your use case needs to enforce once only processing then the NonceManager can be configured to prevent duplicate messages from being processed. (It is a distributed store that currently uses redis locks to ensure accuracy between scaled out workers)
This method is called to configure the NonceManager, and must be called before starting the queue worker to be active.
Params:
Example
EventQ::NonceManager.configure(server: 'redis://127.0.0.1:6379')
This attribute is used to specify a namespace for all events and queues to be created within.
Example
EventQ.namespace = 'development'
The status checker is used to verify the status of a queue or event type (topic/exchange).
####queue?
This method is called to verify connection to a queue.
Params:
Return [Boolean] (True or False)
Example
available = status_checker.queue?(queue)
####event_type?
This method is called to verify connection to an event_type (topic/exchange).
Params:
Return [Boolean] (True or False)
Example
available = status_checker.event_type?(event_type)
After checking out the repo, run bin/setup
to install dependencies.
You can also run bin/console
for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install
.
Run the setup script of eventq to build the environment. This will create the eventq
image.
$ ./script/setup.sh
By default, the full test suite will run against the mock AWS services defined in the docker-compose.yml file. It also will run the tests for RabbitMq.
If you want to run the tests with AWS directly, you will need an AWS account. Put your credentials into the .aws.env
file in the parent directory.
You will also need to comment out the AWS_* environment variables in the docker-compose.yml
file
$ cp ../.aws.env.template ../.aws.env
$ vi ../.aws.env
Run the whole test suite:
$ ./script/test.sh
You can run the specs that don't depend on an AWS account with:
$ ./script/test.sh --tag ~integration
To release a new version, first update the version number in the file EVENTQ_VERSION
.
With that change merged to master
, just draft a new release with the same version you specified in EVENTQ_VERSION
.
Use "Generate Release Notes" to generate details for this release.
This will create a git tag for the version and triggers the GitHub Workflow to publish the new gem (defined in publish.yml) to rubygems.org.
Bug reports and pull requests are welcome on GitHub at https://github.com/sage/eventq. This project is intended to be a safe, welcoming space for collaboration, and contributors are expected to adhere to the Contributor Covenant code of conduct.
EventQ is available as open source under the terms of the MIT licence.
Copyright (c) 2018 Sage Group Plc. All rights reserved.
FAQs
Unknown package
We found that eventq demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Astral unveils pyx, a Python-native package registry in beta, designed to speed installs, enhance security, and integrate deeply with uv.
Security News
The Latio podcast explores how static and runtime reachability help teams prioritize exploitable vulnerabilities and streamline AppSec workflows.
Security News
The latest Opengrep releases add Apex scanning, precision rule tuning, and performance gains for open source static code analysis.