Class: ETL::Processor::BulkImportProcessor

Inherits:
Processor show all
Defined in:
lib/etl/processor/bulk_import_processor.rb

Overview

Processor which is used to bulk import data into a target database. The underlying database driver from ActiveRecord must support the methods bulk_load method.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(control, configuration) ⇒ BulkImportProcessor

Initialize the processor.

Configuration options:

  • :file: The file to load data from

  • :target: The target database

  • :table: The table name

  • :truncate: Set to true to truncate before loading

  • :columns: The columns to load in the order they appear in the bulk data file

  • :field_separator: The field separator. Defaults to a comma

  • :line_separator: The line separator. Defaults to a newline

  • :field_enclosure: The field enclosure charcaters

  • :disable_keys: Set to true to disable keys before, then enable after load (MySql only optimization)

Raises:



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/etl/processor/bulk_import_processor.rb', line 44

def initialize(control, configuration)
  super
  @target = configuration[:target]
  path = Pathname.new(configuration[:file])
  @file = path.absolute? ? path : Pathname.new(File.dirname(File.expand_path(control.file))) + path

  @table = configuration[:table]
  @truncate = configuration[:truncate] ||= false
  @columns = configuration[:columns]
  @field_separator = (configuration[:field_separator] || ',')
  @line_separator = (configuration[:line_separator] || "\n")
  @null_string = (configuration[:null_string] || "")
  @field_enclosure = configuration[:field_enclosure]
  @disable_keys = configuration[:disable_keys] || false
  @replace = configuration[:replace] || false
  
  raise ControlError, "Target must be specified" unless @target
  raise ControlError, "Table must be specified" unless @table
end

Instance Attribute Details

#columnsObject (readonly)

Array of symbols representing the column load order



17
18
19
# File 'lib/etl/processor/bulk_import_processor.rb', line 17

def columns
  @columns
end

#disable_keysObject

boolean that indicates disable keys before, then enable after load (MySql only optimization)



27
28
29
# File 'lib/etl/processor/bulk_import_processor.rb', line 27

def disable_keys
  @disable_keys
end

#field_enclosureObject

The field enclosure (defaults to nil)



21
22
23
# File 'lib/etl/processor/bulk_import_processor.rb', line 21

def field_enclosure
  @field_enclosure
end

#field_separatorObject

The field separator (defaults to a comma)



19
20
21
# File 'lib/etl/processor/bulk_import_processor.rb', line 19

def field_separator
  @field_separator
end

#fileObject (readonly)

The file to load from



9
10
11
# File 'lib/etl/processor/bulk_import_processor.rb', line 9

def file
  @file
end

#line_separatorObject

The line separator (defaults to a newline)



23
24
25
# File 'lib/etl/processor/bulk_import_processor.rb', line 23

def line_separator
  @line_separator
end

#null_stringObject

The string that indicates a NULL (defaults to an empty string)



25
26
27
# File 'lib/etl/processor/bulk_import_processor.rb', line 25

def null_string
  @null_string
end

#replaceObject

replace existing records, not just insert



29
30
31
# File 'lib/etl/processor/bulk_import_processor.rb', line 29

def replace
  @replace
end

#tableObject (readonly)

The table name



13
14
15
# File 'lib/etl/processor/bulk_import_processor.rb', line 13

def table
  @table
end

#targetObject (readonly)

The target database



11
12
13
# File 'lib/etl/processor/bulk_import_processor.rb', line 11

def target
  @target
end

#truncateObject (readonly)

Set to true to truncate



15
16
17
# File 'lib/etl/processor/bulk_import_processor.rb', line 15

def truncate
  @truncate
end

Instance Method Details

#processObject

Execute the processor



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/etl/processor/bulk_import_processor.rb', line 65

def process
  return if ETL::Engine.skip_bulk_import
  return if File.size(file) == 0
  
  conn = ETL::Engine.connection(target)
  conn.transaction do
    conn.truncate(table_name) if truncate
    options = {}
    options[:columns] = columns
    
    options[:disable_keys] = true if disable_keys
    options[:replace] = true if replace
    
    if field_separator || field_enclosure || line_separator || null_string
      options[:fields] = {}
      options[:fields][:null_string] = null_string if null_string
      options[:fields][:delimited_by] = field_separator if field_separator
      options[:fields][:enclosed_by] = field_enclosure if field_enclosure
      options[:fields][:terminated_by] = line_separator if line_separator
    end
    conn.bulk_load(file, table_name, options)
  end
end

#table_nameObject



89
90
91
# File 'lib/etl/processor/bulk_import_processor.rb', line 89

def table_name
  ETL::Engine.table(table, ETL::Engine.connection(target))
end