Class: ETL::Control::InsertUpdateDatabaseDestination
- Inherits:
-
Destination
- Object
- Destination
- ETL::Control::InsertUpdateDatabaseDestination
- Defined in:
- lib/etl/control/destination/insert_update_database_destination.rb
Overview
Destination which writes directly to a database. This is useful when you are dealing with a small amount of data. For larger amounts of data you should probably use the bulk loader if it is supported with your target database as it will use a much faster load method.
Instance Attribute Summary collapse
-
#order ⇒ Object
readonly
Specify the order from the source.
-
#primarykey ⇒ Object
readonly
Specify the primarykey from the source.
-
#table ⇒ Object
readonly
The table.
-
#target ⇒ Object
readonly
The target connection.
-
#truncate ⇒ Object
readonly
Set to true to truncate the destination table first.
Attributes inherited from Destination
#append_rows, #buffer_size, #condition, #configuration, #control, #mapping, #unique
Instance Method Summary collapse
-
#close ⇒ Object
Close the connection.
-
#flush ⇒ Object
Flush the currently buffered data.
-
#initialize(control, configuration, mapping = {}) ⇒ InsertUpdateDatabaseDestination
constructor
Initialize the database destination.
Methods inherited from Destination
class_for_name, #current_row, #errors, #write
Constructor Details
#initialize(control, configuration, mapping = {}) ⇒ InsertUpdateDatabaseDestination
Initialize the database destination
-
control
: The ETL::Control::Control instance -
configuration
: The configuration Hash -
mapping
: The mapping
Configuration options:
-
:database
: The database name (REQUIRED) -
:target
: The target connection (REQUIRED) -
:table
: The table to write to (REQUIRED) -
:truncate
: Set to true to truncate before writing (defaults to false) -
:unique
: Set to true to only insert unique records (defaults to false) -
:append_rows
: Array of rows to append
Mapping options:
-
:order
: The order of fields to write (REQUIRED) -
:primarykey
: The primary key of fields to select insert or update (REQUIRED)
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/etl/control/destination/insert_update_database_destination.rb', line 40 def initialize(control, configuration, mapping={}) super @target = configuration[:target] @table = configuration[:table] @truncate = configuration[:truncate] ||= false @unique = configuration[:unique] ? configuration[:unique] + [scd_effective_date_field] : configuration[:unique] @unique.uniq! unless @unique.nil? @order = mapping[:order] ? mapping[:order] + scd_required_fields : order_from_source @order.uniq! unless @order.nil? @primarykey = mapping[:primarykey] ? mapping[:primarykey] + scd_required_fields : nil @primarykey.uniq! unless @primarykey.nil? raise ControlError, "Primarykey required in mapping" unless @primarykey raise ControlError, "Order required in mapping" unless @order raise ControlError, "Table required" unless @table raise ControlError, "Target required" unless @target end |
Instance Attribute Details
#order ⇒ Object (readonly)
Specify the order from the source
15 16 17 |
# File 'lib/etl/control/destination/insert_update_database_destination.rb', line 15 def order @order end |
#primarykey ⇒ Object (readonly)
Specify the primarykey from the source
18 19 20 |
# File 'lib/etl/control/destination/insert_update_database_destination.rb', line 18 def primarykey @primarykey end |
#table ⇒ Object (readonly)
The table
12 13 14 |
# File 'lib/etl/control/destination/insert_update_database_destination.rb', line 12 def table @table end |
#target ⇒ Object (readonly)
The target connection
9 10 11 |
# File 'lib/etl/control/destination/insert_update_database_destination.rb', line 9 def target @target end |
#truncate ⇒ Object (readonly)
Set to true to truncate the destination table first
21 22 23 |
# File 'lib/etl/control/destination/insert_update_database_destination.rb', line 21 def truncate @truncate end |
Instance Method Details
#close ⇒ Object
Close the connection
114 115 116 117 |
# File 'lib/etl/control/destination/insert_update_database_destination.rb', line 114 def close buffer << append_rows if append_rows flush end |
#flush ⇒ Object
Flush the currently buffered data
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/etl/control/destination/insert_update_database_destination.rb', line 58 def flush conn.transaction do buffer.flatten.each do |row| # check to see if this row's compound key constraint already exists # note that the compound key constraint may not utilize virtual fields next unless row_allowed?(row) # add any virtual fields add_virtuals!(row) primarykeyfilter = [] primarykey.each do |name| primarykeyfilter << "#{conn.quote_column_name(name)} = #{conn.quote(row[name])}" end q = "SELECT * FROM #{conn.quote_table_name(table_name)} WHERE #{primarykeyfilter.join(' AND ')}" ETL::Engine.logger.debug("Executing select: #{q}") res = conn.execute(q, "Select row #{current_row}") none = true case conn when ActiveRecord::ConnectionAdapters::PostgreSQLAdapter res.each { none = false } when ActiveRecord::ConnectionAdapters::MysqlAdapter res.each_hash { none = false } res.free when ActiveRecord::ConnectionAdapters::Mysql2Adapter res.each { none = false } else raise "Unsupported adapter #{conn.class} for this destination" end if none names = [] values = [] order.each do |name| names << conn.quote_column_name(name) values << conn.quote(row[name]) end q = "INSERT INTO #{conn.quote_table_name(table_name)} (#{names.join(',')}) VALUES (#{values.join(',')})" ETL::Engine.logger.debug("Executing insert: #{q}") conn.insert(q, "Insert row #{current_row}") else updatevalues = [] order.each do |name| updatevalues << "#{conn.quote_column_name(name)} = #{conn.quote(row[name])}" end q = "UPDATE #{conn.quote_table_name(table_name)} SET #{updatevalues.join(',')} WHERE #{primarykeyfilter.join(' AND ')}" ETL::Engine.logger.debug("Executing update: #{q}") conn.update(q, "Update row #{current_row}") end @current_row += 1 end buffer.clear end end |