in-parallel
A lightweight Ruby library with very simple syntax, making use of Process.fork to execute code in parallel.
Use Cases
Many other Ruby libraries that simplify parallel execution support one primary use case - crunching through a large queue of small, similar tasks as quickly and efficiently as possible. This library primarily supports the use case of executing a few larger and unrelated tasks in parallel, automatically managing the stdout and passing return values back to the main process. This library was created to be used by Puppet's Beaker test framework to enable parallel execution of some of the framework's tasks, and allow users to execute code in parallel within their tests.
If you are looking for something that excels at executing a large queue of tasks in parallel as efficiently as possible, you should take a look at the parallel project.
Install
gem install in-parallel
Usage
include InParallel
to use as a mix-in
The methods below allow you to fork processes to execute multiple methods or blocks within an enumerable in parallel. They all have this common behavior:
- STDOUT is captured for each forked process and logged all at once when the process completes or is terminated.
- By default execution of processes in parallel will wait until execution of all processes are complete before continuing (with the exception of run_in_background).
- You can specify the parameter kill_all_on_error=true if you want to immediately exit all forked processes when an error executing any of the forked processes occurs.
- When the forked process raises an exception or exits with a non zero exit code, an exception will be raised in the main process.
- Terminating the main process with 'ctrl-c' or killing the process in some other way will immediately cause all forked processes to be killed and log their STDOUT up to that point.
- If the result of the method or block can be marshalled, it will be returned as though it was executed within the same process. If the result cannot be marshalled a warning is produced and the return value will be nil.
- NOTE: results of methods within run_in_parallel can be assigned to instance or class variables, but not local variables. See examples below.
- Will timeout (stop execution and raise an exception) based on a global timeout value, or timeout parameter.
Methods
run_in_parallel(timeout=nil, kill_all_on_error = false, &block)
- Each method in a block will be executed in parallel (unless the method is defined in Kernel or BaseObject).
- Any methods further down the stack won't be affected, only the ones directly within the block.
- Waits for each process in realtime and logs immediately upon completion of each process
def method_with_param(name)
ret_val = "hello #{name} \n"
puts ret_val
ret_val
end
def method_without_param
ret_val = {:foo => "bar"}
puts ret_val
return ret_val
end
run_in_parallel do
@result_1 = method_with_param('world')
@result_2 = method_without_param
end
puts "#{@result_1}, #{@result_2[:foo]}"
stdout:
Forked process for 'method_with_param' - PID = '49398'
Forked process for 'method_without_param' - PID = '49399'
------ Begin output for method_with_param - 49398
hello world
------ Completed output for method_with_param - 49398
------ Begin output for method_without_param - 49399
{:foo=>"bar"}
------ Completed output for method_without_param - 49399
hello world, bar
Enumerable.each_in_parallel(identifier=nil, timeout=(InParallel::InParallelExecutor.timeout), kill_all_on_error = false, &block)
- This is very similar to other solutions, except that it directly extends the Enumerable class with an each_in_parallel method, giving you the ability to pretty simply spawn a process for any item in an array or map.
- Identifies the block location (or caller location if the block does not have a source_location) in the console log to make it clear which block is being executed
- Identifier param is only for logging, otherwise it will use the block source location.
["foo", "bar", "baz"].each_in_parallel { |item| puts item }
run_in_background(ignore_results = true, &block)
- This does basically the same thing as run_in_parallel, except it does not wait for execution of all processes to complete, it returns immediately.
- You can optionally ignore results completely (default) or delay evaluating the results until later
- You can run multiple blocks in the background and then at some later point evaluate all of the results
TMP_FILE = '/tmp/test_file.txt'
def create_file_with_delay(file_path)
sleep 2
File.open(file_path, 'w') { |f| f.write('contents') }
return true
end
run_in_background { create_file_with_delay(TMP_FILE) }
puts(File.exist?(TMP_FILE))
sleep(3)
puts(File.exist?(TMP_FILE))
run_in_background(false) { @result = create_file_with_delay(TMP_FILE) }
run_in_background(false) { @result2 = create_file_with_delay('/tmp/someotherfile.txt') }
puts @result >> "unresolved_parallel_result_0"
wait_for_processes
puts @result
puts @result2
wait_for_processes(timeout=nil, kill_all_on_error = false)
- Used only after run_in_background with ignore_results=false
- Optional args for timeout and kill_all_on_error
- See run_in_background for examples
Global Options
You can get or set the following values to set global defaults. These defaults can also be specified per execution by supplying the values as parameters to the parallel methods.
parallel_signal_interval
parallel_default_timeout
@logger.log_level
Releasing
Follow these steps to publish a new GitHub release, and build and push the gem to https://rubygems.org.
- Bump the "VERSION" in lib/in-parallel/version.rb appropriately based on changes in CHANGELOG.md since the last release.
- Run
./release-prep
to update Gemfile.lock
and CHANGELOG.md
. - Commit and push changes to a new branch, then open a pull request against main and be sure to add the "maintenance" label.
- After the pull request is approved and merged, then navigate to Actions --> Release Gem --> run workflow --> Branch: main --> Run workflow.