Security News
Weekly Downloads Now Available in npm Package Search Results
Socket's package search now displays weekly downloads for npm packages, helping developers quickly assess popularity and make more informed decisions.
JobReactor is a library for creating, scheduling and processing background jobs. It is asynchronous client-server distributed system based on EventMachine.
To use JobReactor with Sinatra or Ruby on Rails you should start distributor in initializer using JR.run
method (it launches EventMachine in separate thread).
Then add rake task(s) which will run the node(s).
If you use server based on EventMachine such Thin use JR.wait_em_and_run method which will initialize JobReactor when EventMachine started.
So, read the 'features' section and try JobReactor. You can do a lot with it.
JobReactor is based on EventMachine. Jobs are launched in EM reactor loop in one thread. There are advantages and disadvantages. The main benefit is fast scheduling, saving and loading. The weak point is the processing of heavy background jobs when each job takes minutes and hours. They will block the reactor and break normal processing.
If you can't divide 'THE BIG JOB' into 'small pieces' you shouldn't use JobReactor. See alternatives such DelayedJob or Resque.
JobReactor is the right solution if you have thousands, millions, and, we hope, billions relatively small jobs.
gem install job_reactor
You should install Redis if you want to persist your jobs.
$ sudo apt-get install redis-server
In your main application:
application.rb
require 'job_reactor'
JR.run do
JR.start_distributor('localhost', 5000) #see lib/job_reactor/job_reactor.rb
end
sleep(1) until(JR.ready?)
# The application
loop do
sleep(3) #Your application is working
JR.enqueue 'my_job', {arg1: 'Hello'}
end
Define the 'my_job' in separate directory (files with job's definitions must be in separate directory):
reactor_jobs/my_jobs.rb
include JobReactor
job 'my_job' do |args|
puts args[:arg1]
end
And the last file - 'the worker code':
worker.rb
require 'job_reactor'
JR.config[:job_directory] = 'reactor_jobs' #this default config, so you can omit this line
JR.run! do
JR.start_node({
:storage => 'memory_storage',
:name => 'worker_1',
:server => ['localhost', 5001],
:distributors => [['localhost', 5000]]
}) #see lib/job_reactor/job_reactor.rb
end
Run 'application.rb' in one terminal window and 'worker.rb' in another. Node connects to distributor, receives the job and works. Cool! But it was the simplest example. See 'examples' directory.
There are no priorities like in Delayed::Job or Stalker. But there are flexible node-based priorities. You can specify the node which should execute the job and the node is forbidden for given job. You can reserve several nodes for high priority jobs.
JR.run do
JR.start_distributor('localhost', 5000)
end
This code runs EventMachine reactor loop in the new thread and call the block given. JR.start_distributor starts EventMachine TCP server on given host and port. And now JobReactor is ready to work.
JR.run! do
JR.start_node({
storage: 'redis_storage',
name: 'redis_node1',
server: ['localhost', 5001],
distributors: [['localhost', 5000]]
})
end
This code runs EventMachine reactor loop (in the main thread: this is the difference between run
and run!
).
And start the Node inside the reactor.
When node starts it:
JR.config[:retry_jobs_at_start]
is true)JR.enqueue('my_job',{arg1: 1, arg2: 2}, {after: 20}, success, error)
The first argument is the name of the job, the second is the arguments hash for the job. The third is the options hash. If you don't specify any option job will be instant job and will be sent to any free node. You can use the following options:
defer: true or false
- node will run the job in 'EM.defer' block. Be careful, the default threadpool size is 20 for EM. You can increase it by setting EM.threadpool_size = 'your value', but it is not recommended;after: seconds
- node will try run the job after seconds
seconds;run_at: time
- node will try run the job at given time;period: seconds
- node will run job periodically, each seconds
seconds;
You can add node: 'node_name'
and not_node: 'node_name'
to the options. This specify the node on which the job should or shouldn't be run. For example:JR.enqueue('my_job', {arg1: 1}, {period: 100, node: 'my_favourite_node', not_node: 'do_not_use_this_node'})
The rule to use specified node is not strict if JR.config[:always_use_specified_node]
is false (default).
This means that distributor will try to send the job to the given node at first. But if the node is locked
(maybe you have just sent another job to it and it is very busy) distributor will look for other node.
The last two arguments are optional. The first is 'success feedback' and the last is 'error feedback'. We use term 'feedback' to distinguish from 'callbacks' and 'errbacks'. 'feedback' is executed on the main application side while 'callbacks' on the node side. 'feedbacks' are the procs which will be called when node sent message that job is completed (successfully or not). The argunments for the 'feedback' are the arguments of the initial job plus all added on the node side.
Example:
#in your 'job_file'
job 'my_job' do |args|
#do smth
args.merge!(result: 'Yay!')
end
#in your application
#success feedback
success = proc {|args| puts args}
#enqueue job
JR.enqueue('my_job', {arg1: 1}, {}, success)
The 'success' proc args will be {arg1: 1, result: 'Yay!'}.
The same story is with 'error feedback'. Note, that error feedback will be launched after all attempts failed on the node side.
See config: JR.config[:max_attempt] = 10
and JR.config[:retry_multiplier]
'callbacks', 'errbacks', 'success feedback', and 'error feedback' helps you divide the job into small relatively independent parts.
To define 'job'
you use JobReactor.job
method (see 'Quick start' section). The only arguments are 'job_name' and the block which is the job itself.
You can define any number of callbacks and errbacks for the given job. Just use JobReactor.job_callback
and JobRector.job_errback
methods. The are three arguments for calbacks and errbacks. The name of the job, the name of callback/errback (optional) and the block.
include JobReactor
job 'test_job' do |args|
puts "job with args #{args}"
end
job_callback 'test_job', 'first_callback' do |args|
puts "first callback with args #{args}"
end
job_callback 'test_job', 'second_callback' do |args|
puts "second callback with args #{args}"
end
job_errback 'test_job', 'first_errback' do |args|
puts "first errback with error #{args[:error]}"
end
job_errback 'test_job', 'second_errback' do |args|
puts 'another errback'
end
Callbacks and errbacks acts as ordinary EventMachine::Deferrable callbacks and errbacks. The 'job'
is the first callack, first 'job_callback'
becomes second callback and so on. See lib/job_reactor/job_reactor/job_parser.rb
for more information. When Node start job it calls succeed
method on the 'job object' with given argument (args). This runs all callbacks sequentially. If error occurs in any callback Node calls fail
method on the 'deferrable' object with the same args (plus merged :error => 'Error message
).
Note, you define jobs, callbacks and errbacks in top-level scope, so the self
is main
object.
You can merge!
additional key-value pairs to 'args' in the job to exchange information between job and it's callbacks.
include JobReactor
job 'test_job' do |args|
args.merge!(result: 'Hello')
end
job_callback 'test_job', 'first_callback' do |args|
puts args[:result]
args.merge!(another_result: 'world')
end
job_callback 'test_job', 'second_callback' do |args|
puts "#{args[:result]} #{args[:another_result]}"
end
Note, if error occurs you can't see additional arguments in job errbacks.
Another trick is JR.config[:merge_job_itself_to_args]
option which is false
by default. If you set this option to true
you can see :job_itself
key in args
. The value contains many usefull information about job ('name', 'attempt', 'status', 'make_after', 'node', etc).
Feedbacks are defined as a Proc object and attached to the 'job' when it is enqueued on the application side.
success = Proc.new { |args| puts 'Success' }
error = Proc.new { |args| puts 'Error' }
JR.enqueue('my_job', {arg1: 1, arg2: 2}, {after: 100}, success, error)
This procs will be called when Node informs about success or error. The 'args' for the corresponding proc will be the same 'args' which is in the job (and it's callbacks) on the node side. So you can, for example, return any result by merging it to 'args' in the job (or it's callbacks).
Note, feedbacks are kept in memory in your application, so they disappear when you restart the application.
Now you can store your jobs in Redis storage ('redis_storage
') or in memory ('memory_storage'
).
We use em-hiredis gem
Only the first, of course, 'really' persists the jobs. You can use the last one if you don't want install Redis, don't need retry jobs and need more speed (by the way, the difference in performance is not so great - Redis is very fast).
You can easily integrate your own storage. Just make it EventMachine compatible.
The default url for Redis server are:
JR.config[:hiredis_url] = "redis://127.0.0.1:6379/0"
JobReactor works asynchronously with Redis using em-redis library to increase the speed. Several nodes can use one Redis storage.
The informaion about jobs is saved several times during processing. This information includes:
By default JobReactor deletes all completed and cancelled jobs, but you can configure it: The default options are:
JR.config[:remove_done_jobs] = true
JR.config[:remove_cancelled_jobs] = true
JR.config[:remove_failed_jobs] = false
JR.config[:retry_jobs_at_start] = true
We provide simple JR::RedisMonitor
module to check the Redis storage from irb console (or from your app).
We use synchronous redis gem.
Connect to Redis by:
JR.config[:redis_host] = 'localhost'
JR.config[:redis_port] = 6379
See methods:
JR::RedisMonitor.jobs_for(node_name)
JR::RedisMonitor.load(job_id)
JR::RedisMonitor.destroy(job_id)
JR::RedisMonitor.destroy_all_jobs_for(node_name)
The MIT License - Copyright (c) 2012-2013 Anton Mishchuk
FAQs
Unknown package
We found that job_reactor demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Socket's package search now displays weekly downloads for npm packages, helping developers quickly assess popularity and make more informed decisions.
Security News
A Stanford study reveals 9.5% of engineers contribute almost nothing, costing tech $90B annually, with remote work fueling the rise of "ghost engineers."
Research
Security News
Socket’s threat research team has detected six malicious npm packages typosquatting popular libraries to insert SSH backdoors.