Class: ThreeScale::Backend::Analytics::Redshift::Importer

Inherits:
Object
  • Object
show all
Defined in:
lib/3scale/backend/analytics/redshift/importer.rb

Overview

The main responsibility of this class is to schedule jobs that import events that are stored in S3 into Redshift. We know that the distributed locking algorithm that we are using guarantees that two jobs will not be running at the same time except in some corner cases, like in the case of a failure of one of the Redis masters. However, this is not a problem in our case. If two Redshift jobs run at the same time, they will try to import the same S3 paths from Redshift. This is not a problem because the import method that we use ensures that we do not import duplicates into Redshift. Check the Redshift::Adapter class for more details on this.

Class Method Summary collapse

Class Method Details

.consistent_data?Boolean

Returns:

  • (Boolean)


42
43
44
# File 'lib/3scale/backend/analytics/redshift/importer.rb', line 42

def consistent_data?
  db_adapter.consistent_data?
end

.disableObject



50
51
52
# File 'lib/3scale/backend/analytics/redshift/importer.rb', line 50

def disable
  storage.del(REDSHIFT_ENABLED_KEY)
end

.enableObject



46
47
48
# File 'lib/3scale/backend/analytics/redshift/importer.rb', line 46

def enable
  storage.set(REDSHIFT_ENABLED_KEY, '1')
end

.enabled?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/3scale/backend/analytics/redshift/importer.rb', line 54

def enabled?
  storage.get(REDSHIFT_ENABLED_KEY).to_i == 1
end

.job_finished(lock_key) ⇒ Object

To be called by from a Redshift job once it exits so other jobs can run



59
60
61
# File 'lib/3scale/backend/analytics/redshift/importer.rb', line 59

def job_finished(lock_key)
  dist_lock.unlock if lock_key == dist_lock.current_lock_key
end

.latest_imported_events_timeObject

Returns a UTC time that represents the hour when the newest events imported in Redshift were generated or nil if nothing has been imported.



36
37
38
39
40
# File 'lib/3scale/backend/analytics/redshift/importer.rb', line 36

def latest_imported_events_time
  latest_timestamp = db_adapter.latest_timestamp_read
  return nil if latest_timestamp.nil?
  DateTime.parse(latest_timestamp).to_time.utc
end

.schedule_jobObject



24
25
26
27
28
29
30
31
# File 'lib/3scale/backend/analytics/redshift/importer.rb', line 24

def schedule_job
  if enabled? && Backend.production?
    lock_key = dist_lock.lock
    if lock_key
      Resque.enqueue(Job, lock_key, Time.now.utc.to_f)
    end
  end
end