Class: ThreeScale::Backend::Analytics::Kinesis::Exporter

Inherits:
Object
  • Object
show all
Defined in:
lib/3scale/backend/analytics/kinesis/exporter.rb

Overview

The main responsibility of this class is to schedule Kinesis jobs. We know that the distributed locking algorithm that we are using guarantees that two jobs will not be running at the same time except in some corner cases, like in the case of a failure of one of the Redis masters. However, this is not a problem in our case. If two Kinesis jobs run at the same time, they will probably export the same events to Kinesis. However, they will not be imported twice into Redshift because the import method that we use detects that two events are the same and only imports one. This detection is done using the ‘time_gen’ field that we attach to each event before they are send to Kinesis.

Class Method Summary collapse

Class Method Details

.disableObject



28
29
30
# File 'lib/3scale/backend/analytics/kinesis/exporter.rb', line 28

def disable
  storage.del(SEND_TO_KINESIS_ENABLED_KEY)
end

.enableObject



24
25
26
# File 'lib/3scale/backend/analytics/kinesis/exporter.rb', line 24

def enable
  storage.set(SEND_TO_KINESIS_ENABLED_KEY, '1')
end

.enabled?Boolean

Returns:

  • (Boolean)


32
33
34
# File 'lib/3scale/backend/analytics/kinesis/exporter.rb', line 32

def enabled?
  storage.get(SEND_TO_KINESIS_ENABLED_KEY).to_i == 1
end

.flush_pending_events(limit = nil) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/3scale/backend/analytics/kinesis/exporter.rb', line 45

def flush_pending_events(limit = nil)
  flushed_events = 0
  if enabled?
    lock_key = dist_lock.lock
    if lock_key
      flushed_events = kinesis_adapter.flush(limit)
      job_finished(lock_key) # flush is not asynchronous
    end
  end
  flushed_events
end

.job_finished(lock_key) ⇒ Object

To be called by a kinesis job once it exits so other jobs can run



62
63
64
# File 'lib/3scale/backend/analytics/kinesis/exporter.rb', line 62

def job_finished(lock_key)
  dist_lock.unlock if lock_key == dist_lock.current_lock_key
end

.num_pending_eventsObject



57
58
59
# File 'lib/3scale/backend/analytics/kinesis/exporter.rb', line 57

def num_pending_events
  kinesis_adapter.num_pending_events
end

.schedule_jobObject



36
37
38
39
40
41
42
43
# File 'lib/3scale/backend/analytics/kinesis/exporter.rb', line 36

def schedule_job
  if enabled?
    lock_key = dist_lock.lock
    if lock_key
      Resque.enqueue(Job, Time.now.utc, lock_key, Time.now.utc.to_f)
    end
  end
end