CsvPiper
A simple wrapper to create a pipeline style csv processor that makes your transforms easily testable.
Installation
Add this line to your application's Gemfile:
gem 'csv_piper'
And then execute:
$ bundle
Or install it yourself as:
$ gem install csv_piper
Usage
CsvPiper handles CSV reading row by row passing each row through a series of processors.
Requirements
- Currently source csv must have headers
Basic Usage
File.open("my/file/path", "r") do |io_stream|
CsvPiper::Builder.new.from(io_stream).with_processors([your_processors]).build.process
end
io_stream
can be any subclass of IO.
build
returns an instance of CsvPiper::Piper
but you will only need this object to call process
unless you are utilising the requires_headers()
method (see builder options below)
Basic Usage with Processors
Extracted from spec/end_to_end_spec.rb
# Build some processors beforehand so we can access them later
output_collector = CollectProcessedEquations.new
error_collector = CsvPiper::Processors::CollectErrors.new
# Open the csv file to get our io source
# Csv Data:
# Input 1,Process,Input 2,Result
# 1,+,1,2
File.open(File.join(File.dirname(__FILE__),"/data/csv_1.csv")) do |file|
# Build piper
csv_piper = CsvPiper::Builder.new.from(file)
.requires_headers(required_headers)
.with_processors([
BuildEquation.new, EvaluateEquation.new, output_collector, error_collector
])
.build
# Process csv
csv_piper.process if csv_piper.has_required_headers?
end
# Grab some output we wanted to collect (You don't have to do this, espicially when processing lots of data)
output = output_collector.output
errors = error_collector.errors
class BuildEquation
def process(source, transformed, errors)
transformed[:equation] = [ source['Input 1'], source['Process'], source['Input 2'], '==', source['Result'] ].join(' ')
[transformed, errors]
end
end
class EvaluateEquation
def process(source, transformed, errors)
begin
transformed[:valid] = eval(transformed[:equation]) == true
rescue Exception
errors.add(:equation, transformed[:equation] + ' is not valid')
end
[transformed, errors]
end
end
class CollectProcessedEquations
attr_reader :output
def initialize
@output = []
end
def process(source, transformed, errors)
@output << {row: errors.row_index}.merge(transformed) if errors.empty?
[transformed, errors]
end
end
Processors
Each processor can do whatever it wants, transformation, logging, saving to a database etc.
Here is an example of a processor that passes the values from the csv straight along to the transformed output:
class PassThrough
def process(source, transformed, errors)
[transformed.merge(source), errors]
end
end
source
is a frozen hash representing the row data out of the csv (with headers as keys).transformed
is whatever has been passed on by the previous processor. The first processor will receive an empty hash.errors
is an instance ofCsvPiper::Errors::Row
. This is really a convenience object for basic error collecting. You could choose to ignore it and implement your own error handling mechanisms.
If you return nil
instead of [transformed, errors]
all further processing of the row will be skipped.
Return value is what will be passed into transformed and errors of the next processor
Pre-Processors
Pre-processors work the same as processors except that their purpose is to modify the source row data that will be passed into all processors. It's useful for doing things like converting strings to primitives, removing columns etc.
They are also allowed to add errors against the row.
Here is an example of a pre-processor that converts all values to uppercase:
class UpCase
def process(source, errors)
transformed = source.each_with_object({}) { |(key, value), memo| memo[key] = value.upcase }
[transformed, errors]
end
end
source
is a hash representing the row data out of the csv which may have been modified by a previous pre-processorerrors
is an instance ofCsvPiper::Errors::Row
If you return nil
instead of [transformed, errors]
all further processing of the row will be skipped.
Return value is what will be passed into source and errors of the next pre-processor (and processors). Final pre-processor value of source will be passed to each processor as a frozen hash. Final pre-processor value of errors will be passed to the first processor.
Error Handling
Built-in
The Errors::Row
object is passed into each processor as the last parameter to process (which must pass it on) and is used to accumulate any and all errors for the particular row being processed. This is useful to collect all errors for display to your users rather than just failing on first error (if this mode matches your use case). You can add the built-in CollectErrors
processor as one of the final processors and this will allow you to grab all the errors ever occured once processing all rows have finished if desired.
Add errors using errors.add(error_key, error)
.
You can access the row number being processed through row_index
which can be useful for displaying or logging errors.
Do-it-yourself
You can ignore the Errors::Row
passed in to each processor and just handle error cases anyway you feel like. You can pass in a logger object in the construction of each of your processors if you want to use it during processing rows to handle errors.
Builder
CsvPiper provides a builder class to allow nicer creation of the piper object.
All builder options utilise the fluent interface pattern and should be followed by a call to build
to get the piper instance and then process
to process the csv.
Eg. CsvPiper::Builder.new.from(io).with_processors(processors).build.process
from(io_stream)
: Specifies the open io stream to read csv data fromwith_pre_processors(pre_processors)
: Takes an array of pre-processors which will transform each row before it is handled by processorswith_processors(processors)
: Takes an array of processors which do all the interesting domain based workwith_csv_options(options)
: Takes an options hash which is passed toCSV.new
to set any options on the CSV libraryrequires_headers(headers)
: Takes an array of strings representing the headers that must be present in the CSV. If this build option is used andprocess
is called on a io source missing a header then a exception is thrown. Before callingprocess
you should make use of thehas_required_headers?
check and then retrieve the missing headers throughmissing_headers
if necessary.
Pre-made Processors
Over time we will collect a bunch of general purpose processors that anyone can use. They can be found in the lib/processors
folder but here are a couple:
Copy
: Copies or maps key-values from the source row into the transformed object.CollectOutput
: Collects the transformed object of every row that is passed through it.CollectErrors
: Collects each non-empty errors hash against every row that is passed through it.CreateActiveModel
: Uses the transformed object as attributes and creates using it (Works with ActiveRecord models). Merges errors from model into row errors (Assumes ActiveModel::Errors interface).
By using CollectOutput
and to a lesser extent CollectErrors
you will start to build up objects in memory. For very large csv files you might not want to use these convenience processors and rather create a new processor that does whatever you need with the row (Ie. log, write to db) which will then be discarded rather than collected.
Require them explicitly if you want to use them.
Eg. require 'csv_piper/processors/collect_output'
Test Support
There is a CsvMockFile object that you can use to mock up an io csv source rather than working with on disk files for your tests. Just require 'csv_piper/test_support/csv_mock_file'
.
Inspiration
Initial inspiration crystalised upon seeing Kiba. If you need to do extensive ETL (particularly if you don't have csv's) then strongly recommend you check it out.
Development
After checking out the repo, run bin/setup
to install dependencies. Then, run rake
to run the tests. You can also run bin/console
for an interactive prompt that will allow you to experiment.
Contributing
Bug reports and pull requests are welcome on GitHub at https://github.com/jazzarati/csv_piper.
License
The gem is available as open source under the terms of the MIT License.