Class: RailsRedshiftReplicator::Importers::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/rails_redshift_replicator/importers/base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(replication) ⇒ Base

Returns a new instance of Base.



6
7
8
9
# File 'lib/rails_redshift_replicator/importers/base.rb', line 6

def initialize(replication)
  return if replication.blank?
  @replication = replication
end

Instance Attribute Details

#replicationObject

Returns the value of attribute replication.



5
6
7
# File 'lib/rails_redshift_replicator/importers/base.rb', line 5

def replication
  @replication
end

Instance Method Details

#copy(table_name = replication.target_table, options = {}) ⇒ Object

Runs Redshift COPY command to import data from S3 (docs.aws.amazon.com/redshift/latest/dg/r_COPY.html)

Parameters:

  • table (String)

    name

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :mark_as_imported (Boolean)

    If record should be flagged as imported

  • :noload (Boolean)

    If true, data will be validated but not imported



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/rails_redshift_replicator/importers/base.rb', line 32

def copy(table_name = replication.target_table, options = {})
  begin
    RailsRedshiftReplicator.logger.info I18n.t(:importing_file, file: import_file, target_table: table_name, scope: :rails_redshift_replicator)
    result = ::RailsRedshiftReplicator.connection.exec copy_statement(table_name, options)
    replication.imported! if result.result_status == 1 && options[:mark_as_imported]
  rescue => e
    drop_table(table_name) if options[:can_drop_target_on_error]
    if e.message.index("stl_load_errors")
      get_redshift_error
      notify_error
    else
      replication.update_attribute :last_error, e.exception.inspect
    end
  end
end

#copy_optionsObject



63
64
65
# File 'lib/rails_redshift_replicator/importers/base.rb', line 63

def copy_options
  RailsRedshiftReplicator.copy_options.values.join(" ")
end

#copy_statement(table_name, options = {}) ⇒ String

Builds the copy statement

Parameters:

  • table (String)

    name

  • options (Hash) (defaults to: {})

Returns:

  • (String)

    sql statement to run



51
52
53
54
55
56
57
58
59
60
61
# File 'lib/rails_redshift_replicator/importers/base.rb', line 51

def copy_statement(table_name, options = {})
  format_options = replication.csv? ? "CSV" : "GZIP DELIMITER ',' ESCAPE REMOVEQUOTES"
  sql = <<-CS
    COPY #{table_name} from '#{import_file}' #{"NOLOAD" if options[:noload]}
    REGION '#{RailsRedshiftReplicator.s3_bucket_params[:region]}'
    CREDENTIALS 'aws_access_key_id=#{RailsRedshiftReplicator.aws_credentials[:key]};aws_secret_access_key=#{RailsRedshiftReplicator.aws_credentials[:secret]}'
    #{format_options}
    #{copy_options}
  CS
  sql.squish
end

#create_side_tableObject

Creates a permanent table for later renaming



96
97
98
# File 'lib/rails_redshift_replicator/importers/base.rb', line 96

def create_side_table
  RailsRedshiftReplicator.connection.exec "CREATE TABLE #{temporary_table_name} (LIKE #{replication.target_table})"
end

#create_temp_tableObject

Creates a temporary table on redshift



91
92
93
# File 'lib/rails_redshift_replicator/importers/base.rb', line 91

def create_temp_table
  RailsRedshiftReplicator.connection.exec "CREATE TEMP TABLE #{temporary_table_name} (LIKE #{replication.target_table})"
end

#drop_table(table_name = temporary_table_name) ⇒ Object

Deletes the temporary table

Parameters:

  • table_name (String) (defaults to: temporary_table_name)


147
148
149
# File 'lib/rails_redshift_replicator/importers/base.rb', line 147

def drop_table(table_name = temporary_table_name)
  ::RailsRedshiftReplicator.connection.exec "drop table if exists #{table_name}"
end

#evaluate_history_capObject

History Cap has a minimum of 2



16
17
18
19
20
# File 'lib/rails_redshift_replicator/importers/base.rb', line 16

def evaluate_history_cap
  if cap = RailsRedshiftReplicator.history_cap
    RailsRedshiftReplicator::Replication.older_than(replication.source_table, cap).delete_all
  end
end

#file_managerObject



22
23
24
# File 'lib/rails_redshift_replicator/importers/base.rb', line 22

def file_manager
  @file_manager ||= RailsRedshiftReplicator::FileManager.new(self)
end

#get_redshift_errorObject

Retrieves the last copy error for a given file on redshift



73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/rails_redshift_replicator/importers/base.rb', line 73

def get_redshift_error
  sql = <<-RE.squish
    SELECT filename, line_number, colname, type, raw_field_value, raw_line, err_reason
    FROM STL_LOAD_ERRORS
    WHERE filename like '%#{import_file}%'
    ORDER BY starttime desc
    LIMIT 1
  RE
  result = ::RailsRedshiftReplicator.connection.exec(sql).entries
  error = result.first.map{ |k, v| [k, v.strip].join('=>') }.join(";")
  replication.update_attribute :last_error, error
end

#importObject

Raises:

  • (NotImplementedError)


11
12
13
# File 'lib/rails_redshift_replicator/importers/base.rb', line 11

def import
  raise NotImplementedError
end

#import_fileString

Returns location of import files on s3.

Returns:

  • (String)

    location of import files on s3



68
69
70
# File 'lib/rails_redshift_replicator/importers/base.rb', line 68

def import_file
  "s3://#{RailsRedshiftReplicator.s3_bucket_params[:bucket]}/#{replication.key}"
end

#merge_or_replace(mode:) ⇒ Object

Runs a merge or replace operation on a redshift table The table is replaced on a FullReplicator strategy The table is merged on a TimedReplicator strategy

Parameters:

  • :mode (Symbol)

    the operation type



104
105
106
107
108
109
# File 'lib/rails_redshift_replicator/importers/base.rb', line 104

def merge_or_replace(mode:)
  target  = replication.target_table
  stage = temporary_table_name
  sql = send("#{mode}_statement", target, stage)
  ::RailsRedshiftReplicator.connection.exec sql
end

#merge_statement(target, stage) ⇒ String

Builds the merge sql statement. At first, it deletes the matching records from the target and temporary tables on the target table. After it imports everything from the temporary table into the target table. (docs.aws.amazon.com/redshift/latest/dg/merge-replacing-existing-rows.html)

Parameters:

  • target (String)
  • stage (String)

    temporary table

Returns:

  • (String)

    Sql Statement



118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/rails_redshift_replicator/importers/base.rb', line 118

def merge_statement(target, stage)
  <<-SQLMERGE
    begin transaction;

    delete from #{target}
    using #{stage}
    where #{target}.id = #{stage}.id;
    insert into #{target}
    select * from #{stage};

    end transaction;
  SQLMERGE
end

#notify_errorObject

TODO



87
88
# File 'lib/rails_redshift_replicator/importers/base.rb', line 87

def notify_error
end

#replace_statement(target, stage) ⇒ String

Parameters:

  • target (String)
  • stage (String)

    temporary table

Returns:

  • (String)

    Sql Statement



136
137
138
139
140
141
142
143
# File 'lib/rails_redshift_replicator/importers/base.rb', line 136

def replace_statement(target, stage)
  <<-SQLREPLACE
    begin transaction;
    drop table #{target};
    alter table #{stage} rename to #{target};
    end transaction;
  SQLREPLACE
end

#temporary_table_nameString

Returns a random name for a temporary table

Returns:

  • (String)

    table name



153
154
155
# File 'lib/rails_redshift_replicator/importers/base.rb', line 153

def temporary_table_name
  @temp_table ||= "temp_#{replication.target_table}_#{Time.now.to_i}"
end