BulkProcessor
Bulk upload data in a file (e.g. CSV), process in the background, then send a
success or failure report
Installation
Add this line to your application's Gemfile:
gem 'bulk-processor'
And then execute:
$ bundle
Or install it yourself as:
$ gem install bulk-processor
Usage
Configuration
Bulk processor requires the following configuration
Back end: ActiveJob
Include the activejob
and back-end queueing gems in your Gemfile, e.g.
gem 'activejob'
gem 'bulk-processor'
gem 'resque'
BulkProcessor.back_end = :active_job
BulkProcessor.queue_adapter = <adapter>
The default queue_adapter is :inline
, which skips queueing and processes synchronously. Since
this is backed by ActiveJob, all of the adapters in ActiveJob::QueueAdapters are supported,
including :resque
.
Back end: Dynosaur
Include the dynosaur
gem in your Gemfile, e.g.
gem 'dynosaur'
gem 'resque'
BulkProcessor.back_end = :dynosaur
BulkProcessor.heroku.api_key = 'my-heroku-api-key'
BulkProcessor.heroku.app_name = 'my-heroku-app-name'
require 'bulk_processor/back_end/dynosaur/tasks'
task :environment
end
AWS S3
BulkProcessor.temp_directory = '/tmp'
BulkProcessor.aws.access_key_id = 'my-aws-access-key'
BulkProcessor.aws.secret_access_key = 'my-aws-secret'
BulkProcessor.aws.bucket = 'my-s3-bucket'
The CSV file passed to BulkProcessor will be persisted on AWS S3 so that the job
can access it. This requires configuring AWS credentials, the S3 bucket in which
to store the file, and a local temp directory to hold the file locally.
Setting up the processor
You will need to supply a class for CSV processing. This class must respond to the
start
instance method, the required_columns
and optional_columns
class methods,
and have the following signature for initialize:
class PetCSVProcessor
def self.required_columns
['species', 'name', 'age']
end
def self.optional_columns
['favorite_toy', 'talents']
end
def initialize(csv, payload:)
end
def start
end
end
Swiss Army Knife base class
To account for a common use case, a base BulkProcessor::CSVProcessor
class is provided,
though it must be explicitly required. This base class can be subclassed to build a CSV processor.
This base class implements the initializer and #start
methods and returns an empty set for .optional_columns
.
The #start
method iterates over each row, processes it using a RowProcessor
,
accumulates the results, which are passed off to a Handler
. An example
implementation could look like:
require 'bulk_processor/csv_processor'
class PetCSVProcessor < BulkProcessor::CSVProcessor
def self.required_columns
['species', 'name', 'age']
end
def self.optional_columns
['favorite_toy', 'talents']
end
def self.row_processor_class
PetRowProcessor
end
def self.post_processor_class
PetPostProcessor
end
def self.handler_class
PetHandler
end
end
class PetRowProcessor < BulkProcessor::CSVProcessor::RowProcessor
def process!
pet = Pet.new(row)
if pet.save
self.successful = true
else
messages.concat(pet.errors.full_messages)
end
end
def primary_keys
['species', 'name']
end
end
class PetPostProcessor
attr_reader :results
def initialize(row_processors)
end
def start
cat_count = 0
@results = []
row_processors.each do |row_processor|
cat_count += 1 if row_processor.cat?
end
if cat_count > 2
@results << BulkProcessor::CSVProcessor::Result.new(messages: ['Too many cats!'],
successful: false)
end
end
end
class PetHandler
def initialize(payload:, results:)
end
def complete!
OwnerMailer.completed(results, payload)
end
def fail!(fatal_error)
OwnerMailer.failed(fatal_error, payload)
end
end
Kicking off the process
processor = BulkProcessor.new(
key: file_name,
stream: file_stream,
processor_class: PetCSVProcessor,
payload: { recipient: current_user.email }
)
if processor.start
else
handle_invalid_file(processor.errors)
end
Parallelization
For larger CSV files, you may wish to process rows in parallel. This gem allows
you to scale up to an arbitrary number of parallel processes by providing an optional
argument to #start
. Doing this will cause the input CSV file to be split into
N number of smaller CSV files, each one being processed in separate processes.
It is important to note that the file must be sorted by the boundary column for
it to deliver on its promise.
processor = BulkProcessor.new(
key: file_name,
stream: file_stream,
processor_class: PetCSVProcessor,
payload: { recipient: current_user.email }
)
if processor.start(5)
else
handle_invalid_file(processor.errors)
end
By default, the file will be split into equal-sized partitions. If you need the partitions
to keep all rows with the same value for a column into the same partition, define .boundary_column
on the processor class to return the name of that column. E.g.
pet_id,meal,mead_date
1,kibble,2015-11-02
1,bits,2015-11-03
...
1,alpo,2015-12-31
2,alpo,2015-11-01
...
class PetCSVProcessor
def self.boundary_column
'pet_id'
end
...
end
Finally, to be notified of any failures in the splitting process, you can define
.handler_class
on your processor class to return a class that implements the Handler role.
If an error is raised in the splitting, #fail!
will be called on the Handler with
the error.
class PetCSVProcessor
def self.handler_class
PetHandler
end
...
end
BulkProcessor::CSVProcessor::Result
The result instances passed from BulkProcessor::CSVProcessor to the Handler
respond to the following messages:
#messages [Array<String>]
- zero or more messages generated when processing the row#row_num [Fixnum|nil]
- the CSV row number (starting with 2) or nil if result is from post-processing#primary_attributes [Hash]
- a set of values that can be used to identify which row the messages are for.
You must override #primary_keys
to use this.#successful?
- true iff the processing happened with no errors
Development
After checking out the repo, run bin/setup
to install dependencies. Then, run bin/console
for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install
. To release a new version, update the version number in version.rb
, and then run bundle exec rake release
to create a git tag for the version, push git commits and tags, and push the .gem
file to rubygems.org.
Contributing
- Fork it ( https://github.com/apartmentlist/bulk-processor/fork )
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create a new Pull Request