11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
# File 'lib/redshift_etl/load.rb', line 11
def self.load_into_redshift(config)
sql_statements = []
sql_statements << "create schema if not exists #{config.redshift_schema};"
sql_statements << "set search_path=#{config.redshift_schema};"
sql_statements << "drop table if exists #{config.redshift_table}_staging;"
sql_statements << create_table(config)
sql_statements << "
copy #{staging_table_name(config)} from 's3://#{config.s3_bucket_name}/#{config.temp_s3_path}'
credentials 'aws_access_key_id=#{config.s3_access_key_id};aws_secret_access_key=#{config.s3_secret_access_key}'
csv
gzip
ignoreheader 1
trimblanks
statupdate on
truncatecolumns
timeformat 'auto'; "
if config.incremental_updates?
sql_statements << "
begin;
#{create_table_statement(config, config.redshift_table)}
delete from #{config.redshift_table}
using #{staging_table_name(config)}
where #{config.redshift_table}.#{config.primary_key} = #{staging_table_name(config)}.#{config.primary_key};
insert into #{config.redshift_table}
select * from #{staging_table_name(config)};
drop table #{staging_table_name(config)};
commit;
"
else
sql_statements << "drop table if exists #{ config.redshift_table};"
sql_statements << "alter table #{staging_table_name(config)} rename to #{config.redshift_table};"
end
Sequel.connect(config.redshift_connection + "?force_standard_strings=f&client_min_messages=") do |db|
sql_statements.each do |statement|
print db[statement].get
end
end
end
|