Class: ThreeScale::Backend::Analytics::Redshift::Adapter
- Inherits:
-
Object
- Object
- ThreeScale::Backend::Analytics::Redshift::Adapter
- Defined in:
- lib/3scale/backend/analytics/redshift/adapter.rb
Overview
This class imports the events stored by Kinesis in S3 into Redshift. It keeps track of the events that have been imported so it does not read twice the same S3 path.
We store ‘repeated’ events in S3. This means that we can find several times the same instance, uinstance, metric, period, timestamp combination.
In order to avoid storing repeated information in Redshift we need to perform UPSERTs. The algorithm followed is the one explained in the official Redshift documentation: docs.aws.amazon.com/redshift/latest/dg/t_updating-inserting-using-staging-tables-.html The process is as follows:
1) Create a temporary table with the data imported from S3, including
duplicates.
Two attributes can have nulls: cinstance and uinstance. We replace
those nulls with ''. I have observed substantial performance gains
because of this.
2) Perform the necessary operations in the temp table to remove
duplicates. (In our case this basically consists of an inner-join).
3) Inside a transaction, delete all the events that are in the temp
table from the final table. Next, insert the ones in the temp
table into the final table. Finally, remove the temp table.
4) Last, we perform a vacuum, because Redshift does not automatically
reclaim and reuse space that has been freed after deletes or
updates. The vacuum operation also leaves the table sorted.
More info:
http://docs.aws.amazon.com/redshift/latest/dg/t_Reclaiming_storage_space202.html
Right now, we are going to vacuum every time we insert new data,
we will see if for performance reasons we need to do it less often.
Defined Under Namespace
Modules: SQL
Constant Summary collapse
- MissingRequiredTables =
Class.new(ThreeScale::Backend::Error)
- MissingLatestS3PathRead =
Class.new(ThreeScale::Backend::Error)
Class Method Summary collapse
-
.consistent_data? ⇒ Boolean
Returns whether the data in the DB is consistent.
-
.insert_path(path) ⇒ Object
This method import a specific S3 path into Redshift.
- .insert_pending_events(silent = false) ⇒ Object
-
.latest_timestamp_read ⇒ Object
Returns a timestamp with format ‘YYYYMMDDHH’ or nil if the latest timestamp read does not exist in the DB.
Class Method Details
.consistent_data? ⇒ Boolean
Returns whether the data in the DB is consistent. Right now, this method only checks if there are duplicated events, but it could be extended in the future.
285 286 287 |
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 285 def consistent_data? execute_command(SQL::duplicated_events).first['count'].to_i.zero? end |
.insert_path(path) ⇒ Object
This method import a specific S3 path into Redshift. Right now, its main use case consists of uploading past events to a path and importing only that path.
264 265 266 267 268 269 270 271 272 |
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 264 def insert_path(path) # Need to check that the 'events' table exists. Do not care about # 'latest_s3_path_read' in this case. unless existing_tables_with_schema.include?(SQL::TABLES[:events]) raise MissingRequiredTables, 'Events table is missing' end save_in_redshift("#{S3_EVENTS_BASE_PATH}#{path}") end |
.insert_pending_events(silent = false) ⇒ Object
249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 249 def insert_pending_events(silent = false) check_redshift_tables pending_times_utc = S3EventPaths.pending_paths() pending_times_utc.each do |pending_time_utc| puts "Loading events generated in hour: #{pending_time_utc}" unless silent save_in_redshift(s3_path(pending_time_utc)) save_latest_read(pending_time_utc) end pending_times_utc.last end |
.latest_timestamp_read ⇒ Object
Returns a timestamp with format ‘YYYYMMDDHH’ or nil if the latest timestamp read does not exist in the DB.
276 277 278 279 280 |
# File 'lib/3scale/backend/analytics/redshift/adapter.rb', line 276 def query_result = execute_command(SQL::LATEST_TIMESTAMP_READ) return nil if query_result.ntuples == 0 query_result.first['s3_path'] end |