Module: ThreeScale::Backend::Analytics::Redshift::Adapter::SQL
- Defined in:
- lib/3scale/backend/analytics/redshift/adapter.rb
Constant Summary collapse
- SCHEMA =
'backend'.freeze
- TABLES =
This importer relies on some tables or views that are created in Redshift to function correctly.
{ events: "#{SCHEMA}.events".freeze, latest_s3_path_read: "#{SCHEMA}.latest_s3_path_read".freeze, temp: "#{SCHEMA}.temp_events".freeze, unique_imported_events: "#{SCHEMA}.unique_imported_events".freeze }.freeze
- EVENT_ATTRS =
%w(service cinstance uinstance metric period timestamp time_gen).freeze
- JOIN_EVENT_ATTRS =
(EVENT_ATTRS - ['time_gen']).freeze
- EXISTING_TABLES =
'SELECT table_name '\ 'FROM information_schema.tables '\ "WHERE table_schema = '#{SCHEMA}';".freeze
- CREATE_TEMP_TABLES =
"DROP TABLE IF EXISTS #{TABLES[:temp]} CASCADE; "\ "CREATE TABLE #{TABLES[:temp]} (LIKE #{TABLES[:events]}); "\ "DROP TABLE IF EXISTS #{TABLES[:unique_imported_events]} CASCADE; "\ "CREATE TABLE #{TABLES[:unique_imported_events]} (LIKE #{TABLES[:events]}); "\ 'COMMIT;'.freeze
- CLEAN_TEMP_TABLES =
"DROP TABLE #{TABLES[:unique_imported_events]}; "\ "DROP TABLE #{TABLES[:temp]};".freeze
- LATEST_TIMESTAMP_READ =
"SELECT s3_path FROM #{TABLES[:latest_s3_path_read]}".freeze
- VACUUM =
"VACUUM FULL #{TABLES[:events]}".freeze
Class Method Summary collapse
- .delete_nulls_from_imported ⇒ Object
-
.delete_outdated_from_unique_imported ⇒ Object
Once we have imported some events and have made sure that we have selected only the ones that are more recent, we need to delete the ones that do not need to be imported.
- .duplicated_events ⇒ Object
-
.fill_table_unique_imported ⇒ Object
In order to get unique events, I use an inner-join with the same table.
- .import_s3_path(path, access_key_id, secret_access_key) ⇒ Object
- .insert_imported_events ⇒ Object
- .store_timestamp_read(timestamp) ⇒ Object
Class Method Details
.delete_nulls_from_imported ⇒ Object
144 145 146 147 148 149 |
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 144 def delete_nulls_from_imported attrs_with_nulls = %w(cinstance uinstance) attrs_with_nulls.map do |attr| replace_nulls(TABLES[:temp], attr, '') end.join(' ') end |
.delete_outdated_from_unique_imported ⇒ Object
Once we have imported some events and have made sure that we have selected only the ones that are more recent, we need to delete the ones that do not need to be imported. Those are the ones that have a time_gen older than that of the same event in the events table.
124 125 126 127 128 129 130 131 132 133 |
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 124 def delete_outdated_from_unique_imported "DELETE FROM #{TABLES[:unique_imported_events]} "\ 'USING (SELECT * '\ "FROM #{TABLES[:events]} e "\ 'WHERE e.time_gen >= (SELECT MIN(time_gen) '\ "FROM #{TABLES[:unique_imported_events]})) AS e "\ "WHERE #{join_comparisons( TABLES[:unique_imported_events], 'e', JOIN_EVENT_ATTRS)} "\ "AND (#{TABLES[:unique_imported_events]}.time_gen <= e.time_gen);".freeze end |
.duplicated_events ⇒ Object
156 157 158 159 160 161 162 |
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 156 def duplicated_events 'SELECT COUNT(*) '\ 'FROM (SELECT COUNT(*) AS count '\ "FROM #{TABLES[:events]} "\ "GROUP BY #{JOIN_EVENT_ATTRS.join(',')}) AS group_counts "\ 'WHERE group_counts.count > 1;' end |
.fill_table_unique_imported ⇒ Object
In order to get unique events, I use an inner-join with the same table. There might be several rows with the same instance, uinstance, metric, period, timestamp and different time_gen and value. From those rows, we want to get just the one with the highest time_gen. We cannot get the one with the highest value because we support SET operations. That means that a value of ‘0’ can be more recent than ‘50’.
The way to solve this is as follows: find out the max time_gen grouping the ‘repeated’ events, and then perform an inner-join to select the row with the most recent data.
Note that we are only getting events with period != ‘minute’ and service = master. This is what is required for the dashboard project. We will need to change this when we start importing data to a Redshift cluster used as a source for the stats API.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 103 def fill_table_unique_imported "INSERT INTO #{TABLES[:unique_imported_events]} "\ 'SELECT e.service, e.cinstance, e.uinstance, e.metric, e.period, '\ 'e.timestamp, e.time_gen, e.value '\ 'FROM '\ '(SELECT service, cinstance, uinstance, metric, period, '\ 'MAX(time_gen) AS max_time_gen, timestamp '\ "FROM #{TABLES[:temp]} "\ "WHERE period != 'minute' AND service = '#{master_service}' "\ 'GROUP BY service, cinstance, uinstance, metric, period, timestamp) AS e1 '\ "INNER JOIN #{TABLES[:temp]} e "\ "ON #{join_comparisons('e', 'e1', JOIN_EVENT_ATTRS)} "\ 'AND e.time_gen = e1.max_time_gen ' \ 'GROUP BY e.service, e.cinstance, e.uinstance, e.metric, e.period, '\ 'e.timestamp, e.time_gen, e.value'.freeze end |
.import_s3_path(path, access_key_id, secret_access_key) ⇒ Object
135 136 137 138 139 140 141 142 |
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 135 def import_s3_path(path, access_key_id, secret_access_key) "COPY #{TABLES[:temp]} "\ "FROM '#{path}' "\ "CREDENTIALS '#{amazon_credentials(access_key_id, secret_access_key)}' "\ "FORMAT AS JSON 'auto' "\ "TIMEFORMAT 'auto';" end |
.insert_imported_events ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 74 def insert_imported_events 'BEGIN TRANSACTION; '\ "DELETE FROM #{TABLES[:events]} "\ "USING #{TABLES[:unique_imported_events]} u "\ "WHERE #{TABLES[:events]}.timestamp >= "\ "(SELECT MIN(timestamp) FROM #{TABLES[:unique_imported_events]}) "\ "AND #{join_comparisons(TABLES[:events], 'u', JOIN_EVENT_ATTRS)} "\ "AND (#{TABLES[:events]}.time_gen < u.time_gen); "\ "INSERT INTO #{TABLES[:events]} "\ "SELECT * FROM #{TABLES[:unique_imported_events]};" \ 'END TRANSACTION;'.freeze end |