Class: EXEL::Processors::SplitProcessor

Inherits:
Object
  • Object
show all
Includes:
LoggingHelper
Defined in:
lib/exel/processors/split_processor.rb

Overview

Implements the split instruction. Used to concurrently process a large file by splitting it into small chunks to be separately processed.

Supported Context Options

  • :delete_resource Defaults to true, can be set to false to preserve the original resource. Otherwise, it will be deleted when splitting is complete

  • :chunk_size Set to specify the number of lines that each chunk should contain

  • :max_chunks Set to specify the maximum number of chunks that should be processed. The resource will not be consumed beyond this limit.

Constant Summary collapse

DEFAULT_CHUNK_SIZE =

Number of lines to include in each chunk. Can be overridden by setting :chunk_size in the context

1000

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from LoggingHelper

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Constructor Details

#initialize(context) ⇒ SplitProcessor

The context must contain a CSV File object in context



27
28
29
30
31
32
33
34
# File 'lib/exel/processors/split_processor.rb', line 27

def initialize(context)
  @buffer = []
  @tempfile_count = 0
  @context = context
  @file = context[:resource]
  @max_chunks = @context[:max_chunks] || Float::INFINITY
  @context[:delete_resource] = true if @context[:delete_resource].nil?
end

Instance Attribute Details

#blockObject

Returns the value of attribute block.



21
22
23
# File 'lib/exel/processors/split_processor.rb', line 21

def block
  @block
end

#file_nameObject

Returns the value of attribute file_name.



21
22
23
# File 'lib/exel/processors/split_processor.rb', line 21

def file_name
  @file_name
end

Instance Method Details

#generate_chunk(content) ⇒ Object



54
55
56
57
58
59
60
61
62
# File 'lib/exel/processors/split_processor.rb', line 54

def generate_chunk(content)
  @tempfile_count += 1
  chunk = Tempfile.new([chunk_filename, '.csv'])
  chunk.write(content)
  chunk.rewind

  log_info "Generated chunk # #{@tempfile_count} for file #{filename(@file)} in #{chunk.path}"
  chunk
end

#process(callback) ⇒ Object



36
37
38
39
40
41
42
# File 'lib/exel/processors/split_processor.rb', line 36

def process(callback)
  process_file(callback)
  finish(callback)
ensure
  @file.close
  File.delete(@file.path) if @context[:delete_resource]
end

#process_line(line, callback) ⇒ Object



44
45
46
47
48
49
50
51
52
# File 'lib/exel/processors/split_processor.rb', line 44

def process_line(line, callback)
  if line == :eof
    flush_buffer(callback)
  else
    @buffer << CSV.generate_line(line)

    flush_buffer(callback) if buffer_full?
  end
end