Module: ThreeScale::Backend::Analytics::Redshift::Adapter::SQL

Defined in:
lib/3scale/backend/analytics/redshift/adapter.rb

Constant Summary collapse

SCHEMA =
'backend'.freeze
TABLES =

This importer relies on some tables or views that are created in Redshift to function correctly.

{ events: "#{SCHEMA}.events".freeze,
latest_s3_path_read: "#{SCHEMA}.latest_s3_path_read".freeze,
temp: "#{SCHEMA}.temp_events".freeze,
unique_imported_events: "#{SCHEMA}.unique_imported_events".freeze }.freeze
EVENT_ATTRS =
%w(service cinstance uinstance metric period timestamp time_gen).freeze
JOIN_EVENT_ATTRS =
(EVENT_ATTRS - ['time_gen']).freeze
EXISTING_TABLES =
'SELECT table_name '\
'FROM information_schema.tables '\
"WHERE table_schema = '#{SCHEMA}';".freeze
CREATE_TEMP_TABLES =
"DROP TABLE IF EXISTS #{TABLES[:temp]} CASCADE; "\
"CREATE TABLE #{TABLES[:temp]} (LIKE #{TABLES[:events]}); "\
"DROP TABLE IF EXISTS #{TABLES[:unique_imported_events]} CASCADE; "\
"CREATE TABLE #{TABLES[:unique_imported_events]} (LIKE #{TABLES[:events]}); "\
'COMMIT;'.freeze
CLEAN_TEMP_TABLES =
"DROP TABLE #{TABLES[:unique_imported_events]}; "\
"DROP TABLE #{TABLES[:temp]};".freeze
LATEST_TIMESTAMP_READ =
"SELECT s3_path FROM #{TABLES[:latest_s3_path_read]}".freeze
VACUUM =
"VACUUM FULL #{TABLES[:events]}".freeze

Class Method Summary collapse

Class Method Details

.delete_nulls_from_importedObject



144
145
146
147
148
149
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 144

def delete_nulls_from_imported
  attrs_with_nulls = %w(cinstance uinstance)
  attrs_with_nulls.map do |attr|
    replace_nulls(TABLES[:temp], attr, '')
  end.join(' ')
end

.delete_outdated_from_unique_importedObject

Once we have imported some events and have made sure that we have selected only the ones that are more recent, we need to delete the ones that do not need to be imported. Those are the ones that have a time_gen older than that of the same event in the events table.



124
125
126
127
128
129
130
131
132
133
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 124

def delete_outdated_from_unique_imported
  "DELETE FROM #{TABLES[:unique_imported_events]} "\
    'USING (SELECT * '\
    "FROM #{TABLES[:events]} e "\
    'WHERE e.time_gen >= (SELECT MIN(time_gen) '\
    "FROM #{TABLES[:unique_imported_events]})) AS e "\
    "WHERE #{join_comparisons(
  TABLES[:unique_imported_events], 'e', JOIN_EVENT_ATTRS)} "\
  "AND (#{TABLES[:unique_imported_events]}.time_gen <= e.time_gen);".freeze
end

.duplicated_eventsObject



156
157
158
159
160
161
162
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 156

def duplicated_events
  'SELECT COUNT(*) '\
    'FROM (SELECT COUNT(*) AS count '\
    "FROM #{TABLES[:events]} "\
    "GROUP BY #{JOIN_EVENT_ATTRS.join(',')}) AS group_counts "\
    'WHERE group_counts.count > 1;'
end

.fill_table_unique_importedObject

In order to get unique events, I use an inner-join with the same table. There might be several rows with the same instance, uinstance, metric, period, timestamp and different time_gen and value. From those rows, we want to get just the one with the highest time_gen. We cannot get the one with the highest value because we support SET operations. That means that a value of ‘0’ can be more recent than ‘50’.

The way to solve this is as follows: find out the max time_gen grouping the ‘repeated’ events, and then perform an inner-join to select the row with the most recent data.

Note that we are only getting events with period != ‘minute’ and service = master. This is what is required for the dashboard project. We will need to change this when we start importing data to a Redshift cluster used as a source for the stats API.



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 103

def fill_table_unique_imported
  "INSERT INTO #{TABLES[:unique_imported_events]} "\
    'SELECT e.service, e.cinstance, e.uinstance, e.metric, e.period, '\
    'e.timestamp, e.time_gen, e.value '\
    'FROM '\
    '(SELECT service, cinstance, uinstance, metric, period, '\
    'MAX(time_gen) AS max_time_gen, timestamp '\
    "FROM #{TABLES[:temp]} "\
    "WHERE period != 'minute' AND service = '#{master_service}' "\
    'GROUP BY service, cinstance, uinstance, metric, period, timestamp) AS e1 '\
    "INNER JOIN #{TABLES[:temp]} e "\
    "ON #{join_comparisons('e', 'e1', JOIN_EVENT_ATTRS)} "\
    'AND e.time_gen = e1.max_time_gen ' \
    'GROUP BY e.service, e.cinstance, e.uinstance, e.metric, e.period, '\
    'e.timestamp, e.time_gen, e.value'.freeze
end

.import_s3_path(path, access_key_id, secret_access_key) ⇒ Object



135
136
137
138
139
140
141
142
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 135

def import_s3_path(path, access_key_id, secret_access_key)
  "COPY #{TABLES[:temp]} "\
    "FROM '#{path}' "\
    "CREDENTIALS '#{amazon_credentials(access_key_id,
                                   secret_access_key)}' "\
                                   "FORMAT AS JSON 'auto' "\
                                   "TIMEFORMAT 'auto';"
end

.insert_imported_eventsObject



74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 74

def insert_imported_events
  'BEGIN TRANSACTION; '\
    "DELETE FROM #{TABLES[:events]} "\
    "USING #{TABLES[:unique_imported_events]} u "\
    "WHERE #{TABLES[:events]}.timestamp >= "\
    "(SELECT MIN(timestamp) FROM #{TABLES[:unique_imported_events]}) "\
    "AND #{join_comparisons(TABLES[:events], 'u', JOIN_EVENT_ATTRS)} "\
    "AND (#{TABLES[:events]}.time_gen < u.time_gen); "\
    "INSERT INTO #{TABLES[:events]} "\
    "SELECT * FROM #{TABLES[:unique_imported_events]};" \
    'END TRANSACTION;'.freeze
end

.store_timestamp_read(timestamp) ⇒ Object



151
152
153
154
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 151

def store_timestamp_read(timestamp)
  "DELETE FROM #{TABLES[:latest_s3_path_read]}; "\
    "INSERT INTO #{TABLES[:latest_s3_path_read]} VALUES ('#{timestamp}');"
end