Class: ETL::Control::InsertUpdateDatabaseDestination

Inherits:
Destination show all
Defined in:
lib/etl/control/destination/insert_update_database_destination.rb

Overview

Destination which writes directly to a database. This is useful when you are dealing with a small amount of data. For larger amounts of data you should probably use the bulk loader if it is supported with your target database as it will use a much faster load method.

Instance Attribute Summary collapse

Attributes inherited from Destination

#append_rows, #buffer_size, #condition, #configuration, #control, #mapping, #unique

Instance Method Summary collapse

Methods inherited from Destination

class_for_name, #current_row, #errors, #write

Constructor Details

#initialize(control, configuration, mapping = {}) ⇒ InsertUpdateDatabaseDestination

Initialize the database destination

  • control: The ETL::Control::Control instance

  • configuration: The configuration Hash

  • mapping: The mapping

Configuration options:

  • :database: The database name (REQUIRED)

  • :target: The target connection (REQUIRED)

  • :table: The table to write to (REQUIRED)

  • :truncate: Set to true to truncate before writing (defaults to false)

  • :unique: Set to true to only insert unique records (defaults to false)

  • :append_rows: Array of rows to append

Mapping options:

  • :order: The order of fields to write (REQUIRED)

  • :primarykey: The primary key of fields to select insert or update (REQUIRED)

Raises:



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/etl/control/destination/insert_update_database_destination.rb', line 40

def initialize(control, configuration, mapping={})
  super
  @target = configuration[:target]
  @table = configuration[:table]
  @truncate = configuration[:truncate] ||= false
  @unique = configuration[:unique] ? configuration[:unique] + [scd_effective_date_field] : configuration[:unique]
  @unique.uniq! unless @unique.nil?
  @order = mapping[:order] ? mapping[:order] + scd_required_fields : order_from_source
  @order.uniq! unless @order.nil?
  @primarykey = mapping[:primarykey] ? mapping[:primarykey] + scd_required_fields : nil
  @primarykey.uniq! unless @primarykey.nil?
  raise ControlError, "Primarykey required in mapping" unless @primarykey
  raise ControlError, "Order required in mapping" unless @order
  raise ControlError, "Table required" unless @table
  raise ControlError, "Target required" unless @target
end

Instance Attribute Details

#orderObject (readonly)

Specify the order from the source



15
16
17
# File 'lib/etl/control/destination/insert_update_database_destination.rb', line 15

def order
  @order
end

#primarykeyObject (readonly)

Specify the primarykey from the source



18
19
20
# File 'lib/etl/control/destination/insert_update_database_destination.rb', line 18

def primarykey
  @primarykey
end

#tableObject (readonly)

The table



12
13
14
# File 'lib/etl/control/destination/insert_update_database_destination.rb', line 12

def table
  @table
end

#targetObject (readonly)

The target connection



9
10
11
# File 'lib/etl/control/destination/insert_update_database_destination.rb', line 9

def target
  @target
end

#truncateObject (readonly)

Set to true to truncate the destination table first



21
22
23
# File 'lib/etl/control/destination/insert_update_database_destination.rb', line 21

def truncate
  @truncate
end

Instance Method Details

#closeObject

Close the connection



114
115
116
117
# File 'lib/etl/control/destination/insert_update_database_destination.rb', line 114

def close
  buffer << append_rows if append_rows
  flush
end

#flushObject

Flush the currently buffered data



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/etl/control/destination/insert_update_database_destination.rb', line 58

def flush
  conn.transaction do
    buffer.flatten.each do |row|
      # check to see if this row's compound key constraint already exists
      # note that the compound key constraint may not utilize virtual fields
      next unless row_allowed?(row)

      # add any virtual fields
      add_virtuals!(row)
      
      primarykeyfilter = []
      primarykey.each do |name|
        primarykeyfilter << "#{conn.quote_column_name(name)} = #{conn.quote(row[name])}"
      end
      q = "SELECT * FROM #{conn.quote_table_name(table_name)} WHERE #{primarykeyfilter.join(' AND ')}"
      ETL::Engine.logger.debug("Executing select: #{q}")
      res = conn.execute(q, "Select row #{current_row}")
      none = true
      
      case conn.class.name
        when "ActiveRecord::ConnectionAdapters::PostgreSQLAdapter"
          res.each { none = false }
        when "ActiveRecord::ConnectionAdapters::MysqlAdapter"
          res.each_hash { none = false }
          res.free
        when "ActiveRecord::ConnectionAdapters::Mysql2Adapter"
          res.each { none = false }
        else raise "Unsupported adapter #{conn.class} for this destination"
      end

      if none
        names = []
        values = []
        order.each do |name|
          names << conn.quote_column_name(name)
          values << conn.quote(row[name])
        end
        q = "INSERT INTO #{conn.quote_table_name(table_name)} (#{names.join(',')}) VALUES (#{values.join(',')})"
        ETL::Engine.logger.debug("Executing insert: #{q}")
        conn.insert(q, "Insert row #{current_row}")
      else
        updatevalues = []
        order.each do |name|
          updatevalues << "#{conn.quote_column_name(name)} = #{conn.quote(row[name])}"
        end
        q = "UPDATE #{conn.quote_table_name(table_name)} SET #{updatevalues.join(',')} WHERE #{primarykeyfilter.join(' AND ')}"
        ETL::Engine.logger.debug("Executing update: #{q}")
        conn.update(q, "Update row #{current_row}")
      end
      @current_row += 1
    end
    buffer.clear
  end
end