Gem Version Build Status Code Climate Test Coverage Dependency Status

CsvPiper

A simple wrapper to create a pipeline style csv processor that makes your transforms easily testable.

Installation

Add this line to your application's Gemfile:

gem 'csv_piper'

And then execute:

$ bundle

Or install it yourself as:

$ gem install csv_piper

Usage

CsvPiper handles CSV reading row by row passing each row through a series of processors.

Requirements

  • Currently source csv must have headers

Basic Usage

File.open("my/file/path", "r") do |io_stream|
    CsvPiper::Builder.new.from(io_stream).with_processors([your_processors]).build.process
end

io_stream can be any subclass of IO.

build returns an instance of CsvPiper::Piper but you will only need this object to call process unless you are utilising the requires_headers() method (see builder options below)

Basic Usage with Processors

Extracted from spec/end_to_end_spec.rb

# Build some processors beforehand so we can access them later
output_collector = CollectProcessedEquations.new
error_collector = CsvPiper::Processors::CollectErrors.new

# Open the csv file to get our io source
# Csv Data:
# Input 1,Process,Input 2,Result
# 1,+,1,2
File.open(File.join(File.dirname(__FILE__),"/data/csv_1.csv")) do |file|

    # Build piper
    csv_piper = CsvPiper::Builder.new.from(file)
        .requires_headers(required_headers)
        .with_processors([
          BuildEquation.new, EvaluateEquation.new, output_collector, error_collector
        ])
        .build

    # Process csv
    csv_piper.process if csv_piper.has_required_headers?
end

# Grab some output we wanted to collect (You don't have to do this, espicially when processing lots of data)
output = output_collector.output
errors = error_collector.errors


class BuildEquation
    def process(source, transformed, errors)
        transformed[:equation] = [ source['Input 1'], source['Process'], source['Input 2'], '==', source['Result'] ].join(' ')
        [transformed, errors]
    end
end

class EvaluateEquation
    def process(source, transformed, errors)
        begin
            transformed[:valid] = eval(transformed[:equation]) == true
        rescue Exception
            errors.add(:equation, transformed[:equation] + ' is not valid')
        end
        [transformed, errors]
    end
end

class CollectProcessedEquations
    attr_reader :output
    def initialize
        @output = []
    end

    def process(source, transformed, errors)
        @output << {row: errors.row_index}.merge(transformed) if errors.empty?
        [transformed, errors]
    end
end

Processors

Each processor can do whatever it wants, transformation, logging, saving to a database etc.

Here is an example of a processor that passes the values from the csv straight along to the transformed output:

class PassThrough
  def process(source, transformed, errors)
    [transformed.merge(source), errors]
  end
end
  • source is a frozen hash representing the row data out of the csv (with headers as keys).
  • transformed is whatever has been passed on by the previous processor. The first processor will receive an empty hash.
  • errors is an instance of CsvPiper::Errors::Row. This is really a convenience object for basic error collecting. You could choose to ignore it and implement your own error handling mechanisms.

If you return nil instead of [transformed, errors] all further processing of the row will be skipped.

Return value is what will be passed into transformed and errors of the next processor

Pre-Processors

Pre-processors work the same as processors except that their purpose is to modify the source row data that will be passed into all processors. It's useful for doing things like converting strings to primitives, removing columns etc.

They are also allowed to add errors against the row.

Here is an example of a pre-processor that converts all values to uppercase:

class UpCase
  def process(source, errors)
    transformed = source.each_with_object({}) { |(key, value), memo| memo[key] = value.upcase }
    [transformed, errors]
  end
end
  • source is a hash representing the row data out of the csv which may have been modified by a previous pre-processor
  • errors is an instance of CsvPiper::Errors::Row

If you return nil instead of [transformed, errors] all further processing of the row will be skipped.

Return value is what will be passed into source and errors of the next pre-processor (and processors). Final pre-processor value of source will be passed to each processor as a frozen hash. Final pre-processor value of errors will be passed to the first processor.

Error Handling

Built-in

The Errors::Row object is passed into each processor as the last parameter to process (which must pass it on) and is used to accumulate any and all errors for the particular row being processed. This is useful to collect all errors for display to your users rather than just failing on first error (if this mode matches your use case). You can add the built-in CollectErrors processor as one of the final processors and this will allow you to grab all the errors ever occured once processing all rows have finished if desired.

Add errors using errors.add(error_key, error).

You can access the row number being processed through row_index which can be useful for displaying or logging errors.

Do-it-yourself

You can ignore the Errors::Row passed in to each processor and just handle error cases anyway you feel like. You can pass in a logger object in the construction of each of your processors if you want to use it during processing rows to handle errors.

Builder

CsvPiper provides a builder class to allow nicer creation of the piper object.

All builder options utilise the fluent interface pattern and should be followed by a call to build to get the piper instance and then process to process the csv.

Eg. CsvPiper::Builder.new.from(io).with_processors(processors).build.process

  • from(io_stream): Specifies the open io stream to read csv data from
  • with_pre_processors(pre_processors): Takes an array of pre-processors which will transform each row before it is handled by processors
  • with_processors(processors): Takes an array of processors which do all the interesting domain based work
  • with_csv_options(options): Takes an options hash which is passed to CSV.new to set any options on the CSV library
  • requires_headers(headers): Takes an array of strings representing the headers that must be present in the CSV. If this build option is used and process is called on a io source missing a header then a exception is thrown. Before calling process you should make use of the has_required_headers? check and then retrieve the missing headers through missing_headers if necessary.

Pre-made Processors

Over time we will collect a bunch of general purpose processors that anyone can use. They can be found in the lib/processors folder but here are a couple:

  • Copy: Copies or maps key-values from the source row into the transformed object.
  • CollectOutput: Collects the transformed object of every row that is passed through it.
  • CollectErrors: Collects each non-empty errors hash against every row that is passed through it.
  • CreateActiveModel: Uses the transformed object as attributes and creates using it (Works with ActiveRecord models). Merges errors from model into row errors (Assumes ActiveModel::Errors interface).

By using CollectOutput and to a lesser extent CollectErrors you will start to build up objects in memory. For very large csv files you might not want to use these convenience processors and rather create a new processor that does whatever you need with the row (Ie. log, write to db) which will then be discarded rather than collected.

Require them explicitly if you want to use them.

Eg. require 'csv_piper/processors/collect_output'

Test Support

There is a CsvMockFile object that you can use to mock up an io csv source rather than working with on disk files for your tests. Just require 'csv_piper/test_support/csv_mock_file'.

Inspiration

Initial inspiration crystalised upon seeing Kiba. If you need to do extensive ETL (particularly if you don't have csv's) then strongly recommend you check it out.

Development

After checking out the repo, run bin/setup to install dependencies. Then, run rake to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/jazzarati/csv_piper.

License

The gem is available as open source under the terms of the MIT License.