Class: PCSV
- Inherits:
-
Object
- Object
- PCSV
- Defined in:
- lib/pcsv.rb,
lib/pcsv/version.rb
Constant Summary collapse
- VERSION =
"0.1.2"
Class Method Summary collapse
-
.each(path, options = {}) ⇒ Object
Opens a CSV file and runs the block on each cell in parallel.
-
.map(path, options = {}) ⇒ Object
Opens a CSV file and maps the results of a block on each cell in parallel.
-
.process(action, path, options = {}) ⇒ Object
Performs a given action on each cell of a CSV file.
Class Method Details
.each(path, options = {}) ⇒ Object
Opens a CSV file and runs the block on each cell in parallel. Returns a copy of the CSV file.
13 14 15 |
# File 'lib/pcsv.rb', line 13 def self.each(path, ={}) return process(:each, path, , &Proc.new) end |
.map(path, options = {}) ⇒ Object
Opens a CSV file and maps the results of a block on each cell in parallel. Returns a copy of the CSV file.
19 20 21 |
# File 'lib/pcsv.rb', line 19 def self.map(path, ={}) return process(:map, path, , &Proc.new) end |
.process(action, path, options = {}) ⇒ Object
Performs a given action on each cell of a CSV file.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/pcsv.rb', line 24 def self.process(action, path, ={}) [:headers] = true thread_count = .delete(:thread_count) || 10 if_proc = .delete(:if) on_count_proc = .delete(:on_count) = .has_key?(:progress_bar) ? .delete(:progress_bar) : true = nil # Open CSV & build a worker queue. csv = CSV.read(path, ) queue = [] headers = nil csv.each_with_index do |row, row_index| headers ||= csv.headers row.fields.each_with_index do |field, col_index| item = { row:row, row_index:row_index, col_index:col_index, value:field.to_s, header:headers[col_index] } next if if_proc.nil? || !if_proc.call(item) queue << item end end = ::ProgressBar.create(:total => queue.length, :format => '%a |%B| %E %P%%') if on_count_proc.call(queue.length) unless on_count_proc.nil? # Launch threads and iterate over queue until it's done. mutex = Mutex.new() threads = [] thread_count.times do |thread_index| threads << Thread.new() do loop do # Grab an item from the front of the queue. item = nil mutex.synchronize do item = queue.shift() end break if item.nil? # Invoke the block with the row info. begin result = yield item, mutex if action == :map mutex.synchronize { item[:row][item[:col_index]] = result } end mutex.synchronize { .increment } unless .nil? rescue StandardError => e mutex.synchronize { .clear } unless .nil? warn("[ERROR] #{e.} [R#{item[:row_index]},C#{item[:col_index]}]") end end end end begin threads.each { |t| t.join } rescue SystemExit, Interrupt threads.each { |thread| thread.kill } end return csv end |