Class: RailsRedshiftReplicator::Importers::Base
- Inherits:
-
Object
- Object
- RailsRedshiftReplicator::Importers::Base
- Defined in:
- lib/rails_redshift_replicator/importers/base.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#replication ⇒ Object
Returns the value of attribute replication.
Instance Method Summary collapse
-
#copy(table_name = replication.target_table, options = {}) ⇒ Object
Runs Redshift COPY command to import data from S3 (docs.aws.amazon.com/redshift/latest/dg/r_COPY.html).
- #copy_options ⇒ Object
-
#copy_statement(table_name, options = {}) ⇒ String
Builds the copy statement.
-
#create_side_table ⇒ Object
Creates a permanent table for later renaming.
-
#create_temp_table ⇒ Object
Creates a temporary table on redshift.
-
#drop_table(table_name = temporary_table_name) ⇒ Object
Deletes the temporary table.
-
#evaluate_history_cap ⇒ Object
History Cap has a minimum of 2.
- #file_manager ⇒ Object
-
#get_redshift_error ⇒ Object
Retrieves the last copy error for a given file on redshift.
- #import ⇒ Object
-
#import_file ⇒ String
Location of import files on s3.
-
#initialize(replication) ⇒ Base
constructor
A new instance of Base.
-
#merge_or_replace(mode:) ⇒ Object
Runs a merge or replace operation on a redshift table The table is replaced on a FullReplicator strategy The table is merged on a TimedReplicator strategy.
-
#merge_statement(target, stage) ⇒ String
Builds the merge sql statement.
-
#notify_error ⇒ Object
TODO.
-
#replace_statement(target, stage) ⇒ String
Builds the replace sql statement.
-
#temporary_table_name ⇒ String
Returns a random name for a temporary table.
Constructor Details
#initialize(replication) ⇒ Base
Returns a new instance of Base.
6 7 8 9 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 6 def initialize(replication) return if replication.blank? @replication = replication end |
Instance Attribute Details
#replication ⇒ Object
Returns the value of attribute replication.
5 6 7 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 5 def replication @replication end |
Instance Method Details
#copy(table_name = replication.target_table, options = {}) ⇒ Object
Runs Redshift COPY command to import data from S3 (docs.aws.amazon.com/redshift/latest/dg/r_COPY.html)
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 32 def copy(table_name = replication.target_table, = {}) begin RailsRedshiftReplicator.logger.info I18n.t(:importing_file, file: import_file, target_table: table_name, scope: :rails_redshift_replicator) result = ::RailsRedshiftReplicator.connection.exec copy_statement(table_name, ) replication.imported! if result.result_status == 1 && [:mark_as_imported] rescue => e drop_table(table_name) if [:can_drop_target_on_error] if e..index("stl_load_errors") get_redshift_error notify_error else replication.update_attribute :last_error, e.exception.inspect end end end |
#copy_options ⇒ Object
63 64 65 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 63 def RailsRedshiftReplicator..values.join(" ") end |
#copy_statement(table_name, options = {}) ⇒ String
Builds the copy statement
51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 51 def copy_statement(table_name, = {}) = replication.csv? ? "CSV" : "GZIP DELIMITER ',' ESCAPE REMOVEQUOTES" sql = <<-CS COPY #{table_name} from '#{import_file}' #{"NOLOAD" if [:noload]} REGION '#{RailsRedshiftReplicator.s3_bucket_params[:region]}' CREDENTIALS 'aws_access_key_id=#{RailsRedshiftReplicator.aws_credentials[:key]};aws_secret_access_key=#{RailsRedshiftReplicator.aws_credentials[:secret]}' #{} #{} CS sql.squish end |
#create_side_table ⇒ Object
Creates a permanent table for later renaming
96 97 98 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 96 def create_side_table RailsRedshiftReplicator.connection.exec "CREATE TABLE #{temporary_table_name} (LIKE #{replication.target_table})" end |
#create_temp_table ⇒ Object
Creates a temporary table on redshift
91 92 93 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 91 def create_temp_table RailsRedshiftReplicator.connection.exec "CREATE TEMP TABLE #{temporary_table_name} (LIKE #{replication.target_table})" end |
#drop_table(table_name = temporary_table_name) ⇒ Object
Deletes the temporary table
147 148 149 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 147 def drop_table(table_name = temporary_table_name) ::RailsRedshiftReplicator.connection.exec "drop table if exists #{table_name}" end |
#evaluate_history_cap ⇒ Object
History Cap has a minimum of 2
16 17 18 19 20 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 16 def evaluate_history_cap if cap = RailsRedshiftReplicator.history_cap RailsRedshiftReplicator::Replication.older_than(replication.source_table, cap).delete_all end end |
#file_manager ⇒ Object
22 23 24 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 22 def file_manager @file_manager ||= RailsRedshiftReplicator::FileManager.new(self) end |
#get_redshift_error ⇒ Object
Retrieves the last copy error for a given file on redshift
73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 73 def get_redshift_error sql = <<-RE.squish SELECT filename, line_number, colname, type, raw_field_value, raw_line, err_reason FROM STL_LOAD_ERRORS WHERE filename like '%#{import_file}%' ORDER BY starttime desc LIMIT 1 RE result = ::RailsRedshiftReplicator.connection.exec(sql).entries error = result.first.map{ |k, v| [k, v.strip].join('=>') }.join(";") replication.update_attribute :last_error, error end |
#import ⇒ Object
11 12 13 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 11 def import raise NotImplementedError end |
#import_file ⇒ String
Returns location of import files on s3.
68 69 70 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 68 def import_file "s3://#{RailsRedshiftReplicator.s3_bucket_params[:bucket]}/#{replication.key}" end |
#merge_or_replace(mode:) ⇒ Object
Runs a merge or replace operation on a redshift table The table is replaced on a FullReplicator strategy The table is merged on a TimedReplicator strategy
104 105 106 107 108 109 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 104 def merge_or_replace(mode:) target = replication.target_table stage = temporary_table_name sql = send("#{mode}_statement", target, stage) ::RailsRedshiftReplicator.connection.exec sql end |
#merge_statement(target, stage) ⇒ String
Builds the merge sql statement. At first, it deletes the matching records from the target and temporary tables on the target table. After it imports everything from the temporary table into the target table. (docs.aws.amazon.com/redshift/latest/dg/merge-replacing-existing-rows.html)
118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 118 def merge_statement(target, stage) <<-SQLMERGE begin transaction; delete from #{target} using #{stage} where #{target}.id = #{stage}.id; insert into #{target} select * from #{stage}; end transaction; SQLMERGE end |
#notify_error ⇒ Object
TODO
87 88 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 87 def notify_error end |
#replace_statement(target, stage) ⇒ String
Builds the replace sql statement. (docs.aws.amazon.com/redshift/latest/dg/performing-a-deep-copy.html)
136 137 138 139 140 141 142 143 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 136 def replace_statement(target, stage) <<-SQLREPLACE begin transaction; drop table #{target}; alter table #{stage} rename to #{target}; end transaction; SQLREPLACE end |
#temporary_table_name ⇒ String
Returns a random name for a temporary table
153 154 155 |
# File 'lib/rails_redshift_replicator/importers/base.rb', line 153 def temporary_table_name @temp_table ||= "temp_#{replication.target_table}_#{Time.now.to_i}" end |