Class: ETL::Control::UpdateDatabaseDestination

Inherits:
Destination show all
Defined in:
lib/etl/control/destination/update_database_destination.rb

Overview

Destination which writes directly to a database. This is useful when you are dealing with a small amount of data. For larger amounts of data you should probably use the bulk loader if it is supported with your target database as it will use a much faster load method.

Instance Attribute Summary collapse

Attributes inherited from Destination

#append_rows, #buffer_size, #condition, #configuration, #control, #mapping, #unique

Instance Method Summary collapse

Methods inherited from Destination

class_for_name, #current_row, #errors, #write

Constructor Details

#initialize(control, configuration, mapping = {}) ⇒ UpdateDatabaseDestination

Initialize the database destination

  • control: The ETL::Control::Control instance

  • configuration: The configuration Hash

  • mapping: The mapping

Configuration options:

  • :database: The database name (REQUIRED)

  • :target: The target connection (REQUIRED)

  • :table: The table to write to (REQUIRED)

  • :unique: Set to true to only insert unique records (defaults to false)

  • :append_rows: Array of rows to append

Mapping options:

  • :order: The order of fields to write (REQUIRED)

  • :conditions: The conditions on the fields to update (REQUIRED)

Raises:



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/etl/control/destination/update_database_destination.rb', line 36

def initialize(control, configuration, mapping={})
  super
  @target = configuration[:target]
  @table = configuration[:table]
  @unique = configuration[:unique] ? configuration[:unique] + [scd_effective_date_field] : configuration[:unique]
  @unique.uniq! unless @unique.nil?
  @order = mapping[:order] ? mapping[:order] + scd_required_fields : order_from_source
  @order.uniq! unless @order.nil?
  @conditions = mapping[:conditions] ? mapping[:conditions] + scd_required_fields : nil
  @conditions.uniq! unless @conditions.nil?
  raise ControlError, "Conditions required in mapping" unless @conditions
  raise ControlError, "Order required in mapping" unless @order
  raise ControlError, "Table required" unless @table
  raise ControlError, "Target required" unless @target
end

Instance Attribute Details

#conditionsObject (readonly)

Specify the conditions from the source



18
19
20
# File 'lib/etl/control/destination/update_database_destination.rb', line 18

def conditions
  @conditions
end

#orderObject (readonly)

Specify the order from the source



15
16
17
# File 'lib/etl/control/destination/update_database_destination.rb', line 15

def order
  @order
end

#tableObject (readonly)

The table



12
13
14
# File 'lib/etl/control/destination/update_database_destination.rb', line 12

def table
  @table
end

#targetObject (readonly)

The target connection



9
10
11
# File 'lib/etl/control/destination/update_database_destination.rb', line 9

def target
  @target
end

Instance Method Details

#closeObject

Close the connection



88
89
90
91
# File 'lib/etl/control/destination/update_database_destination.rb', line 88

def close
  buffer << append_rows if append_rows
  flush
end

#flushObject

Flush the currently buffered data



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/etl/control/destination/update_database_destination.rb', line 53

def flush
  conn.transaction do
    buffer.flatten.each do |row|
      # check to see if this row's compound key constraint already exists
      # note that the compound key constraint may not utilize virtual fields
      next unless row_allowed?(row)

      # add any virtual fields
      add_virtuals!(row)
      
      conditionsfilter = []
      conditions.each do |cond|
        c = " #{cond[:field]} #{cond[:comp]} #{cond[:value]} "
        condition = c
        begin
          condition = eval('"' + c + '"')
        rescue
        end
        conditionsfilter << condition
      end

      updatevalues = []
      order.each do |name|
        updatevalues << "#{conn.quote_column_name(name)} = #{conn.quote(row[name])}"
      end
      q = "UPDATE #{conn.quote_table_name(table_name)} SET #{updatevalues.join(',')} WHERE #{conditionsfilter.join(' AND ')}"
      ETL::Engine.logger.debug("Executing update: #{q}")
      conn.update(q, "Update row #{current_row}")
      @current_row += 1
    end
    buffer.clear
  end
end