JRubyParallelProcessing
Table of Contents
Overview
JRubyParallelProcessing
is a gem designed for parallel processing tasks in JRuby. It offers efficient data processing and API request handling using multiple threads, along with a task queue system for managing and executing tasks with priority and retries.
Installation
Add this gem to your Gemfile:
gem 'jruby_parallel_processing'
Usage
DataProcessor
DataProcessor
is a class designed for parallel data processing. It allows you to split data into chunks and process them across multiple threads, which significantly speeds up task execution. Additionally, it supports middleware hooks for customizing the behavior before and after processing.
Basic Example
require 'jruby_parallel_processing'
data = (1..100).to_a
processor = JRubyParallelProcessing::DataProcessor.new(data_array: data, in_threads: 4)
processor.process do |item|
puts item
end
Features
- Efficient Parallel Processing: Process data using multiple threads.
- Chunk Management: Automatically splits data into manageable chunks.
- Stream Support: Handles data processing from streams (e.g., IO, StringIO).
- Middleware: Supports before and after processing middleware for custom logic injection.
Configuration
- data_array: Array of data to be processed.
- stream: Stream object (e.g., IO, StringIO) for processing data line-by-line.
- in_threads: Number of threads for parallel processing (default is 4).
- chunk_size: Size of chunks for processing.
- logger: Logger object for custom logging.
- queue_size: Size of the queue for stream processing (default is 100).
- timeout: Timeout for stream processing (default is 10 seconds).
Example Usage
Middleware Example
processor = JRubyParallelProcessing::DataProcessor.new(data_array: data, in_threads: 4)
processor.add_middleware(:before_process) do
puts "Starting processing..."
end
processor.add_middleware(:after_process) do
puts "Finished processing."
end
processor.process do |item|
puts item
end
Data Processing from Stream:
require 'stringio'
require 'jruby_parallel_processing'
stream = StringIO.new("line 1\nline 2\nline 3\n")
processor = JRubyParallelProcessing::DataProcessor.new(stream: stream, in_threads: 2)
processor.process do |line|
puts line
end
TaskQueue
TaskQueue
is a class for managing and executing tasks with priority, retries, and custom configurations. It allows for efficient task scheduling and error handling.
Basic Example
require 'jruby_parallel_processing'
task_queue = JRubyParallelProcessing::TaskQueue.new(max_retries: 3, retry_delay: 0.1, max_queue_size: 10)
task_queue.add_task(1) do
puts "Task executed"
end
task_queue.process_tasks
Features
- Adds tasks to the queue with priority.
- Retries failed tasks with configurable retry count and delay.
- Handles task execution using a fixed thread pool.
Configuration
- max_retries: Maximum number of retries for failed tasks (default is
3).
- retry_delay: Delay between retries in seconds (default is 0.1).
- max_queue_size: Maximum number of tasks in the queue (default is 10).
- logger: Logger object for custom logging.
ApiRequestProcessor
ApiRequestProcessor
handles parallel API requests with built-in error handling and retries.
Basic Example
urls = ["https://api.example.com/data", "https://api.example.com/other"]
processor = JRubyParallelProcessing::ApiRequestProcessor.new(urls, in_threads: 4)
results = processor.process(http_method: :get) do |response|
puts "Received response: #{response.body}"
end
Features
- Parallel API requests with configurable HTTP methods, timeouts, and
retries.
- Supports custom headers and parameters for requests.
- Automatically parses responses in various formats.
Configuration
- urls: Array of URLs for API requests.
- in_threads: Number of threads for parallel API requests (default is
4).
- timeout: Timeout for API requests (default is 5 seconds).
- retries: Number of retries for failed requests (default is 3).
- logger: Logger object for custom logging.
- http_method: HTTP method to use (default is GET).
- headers: Custom headers for requests.
- params: Parameters for POST/PUT requests.
DistributedWorker
DistributedWorker
is a class for managing distributed tasks across multiple worker nodes. It enables task distribution, prioritization, and ensures that workers remain active through a heartbeat mechanism.
Basic Example
require 'jruby_parallel_processing'
worker = JRubyParallelProcessing::DistributedWorker.new("localhost", 8787)
result = worker.execute_task(-> { puts "Task executed" }, priority: 5)
puts result[:status]
worker_url = "druby://localhost:8787"
remote_worker = JRubyParallelProcessing::DistributedWorker.connect_to_worker(worker_url)
remote_worker.send_heartbeat
Features
- Distributed Task Execution: Distribute tasks to different worker nodes.
- Task Prioritization: Tasks are queued with priority, ensuring high-priority tasks are processed first.
- Heartbeat Mechanism: Ensures worker nodes remain active and can report their status.
- Fault-Tolerant Task Queue: Handles errors during task queueing and processing.
Configuration Options
- host: Host address for the DistributedWorker service (default is localhost).
- port: Port for the DistributedWorker service (default is 8787).
- priority: Priority of the task being queued (default is 0).
License
This gem is licensed under the MIT License.