Class: ThreeScale::Backend::Analytics::Kinesis::Exporter
- Inherits:
-
Object
- Object
- ThreeScale::Backend::Analytics::Kinesis::Exporter
- Defined in:
- lib/3scale/backend/analytics/kinesis/exporter.rb
Overview
The main responsibility of this class is to schedule Kinesis jobs. We know that the distributed locking algorithm that we are using guarantees that two jobs will not be running at the same time except in some corner cases, like in the case of a failure of one of the Redis masters. However, this is not a problem in our case. If two Kinesis jobs run at the same time, they will probably export the same events to Kinesis. However, they will not be imported twice into Redshift because the import method that we use detects that two events are the same and only imports one. This detection is done using the ‘time_gen’ field that we attach to each event before they are send to Kinesis.
Class Method Summary collapse
- .disable ⇒ Object
- .enable ⇒ Object
- .enabled? ⇒ Boolean
- .flush_pending_events(limit = nil) ⇒ Object
-
.job_finished(lock_key) ⇒ Object
To be called by a kinesis job once it exits so other jobs can run.
- .num_pending_events ⇒ Object
- .schedule_job ⇒ Object
Class Method Details
.disable ⇒ Object
28 29 30 |
# File 'lib/3scale/backend/analytics/kinesis/exporter.rb', line 28 def disable storage.del(SEND_TO_KINESIS_ENABLED_KEY) end |
.enable ⇒ Object
24 25 26 |
# File 'lib/3scale/backend/analytics/kinesis/exporter.rb', line 24 def enable storage.set(SEND_TO_KINESIS_ENABLED_KEY, '1') end |
.enabled? ⇒ Boolean
32 33 34 |
# File 'lib/3scale/backend/analytics/kinesis/exporter.rb', line 32 def enabled? storage.get(SEND_TO_KINESIS_ENABLED_KEY).to_i == 1 end |
.flush_pending_events(limit = nil) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/3scale/backend/analytics/kinesis/exporter.rb', line 45 def flush_pending_events(limit = nil) flushed_events = 0 if enabled? lock_key = dist_lock.lock if lock_key flushed_events = kinesis_adapter.flush(limit) job_finished(lock_key) # flush is not asynchronous end end flushed_events end |
.job_finished(lock_key) ⇒ Object
To be called by a kinesis job once it exits so other jobs can run
62 63 64 |
# File 'lib/3scale/backend/analytics/kinesis/exporter.rb', line 62 def job_finished(lock_key) dist_lock.unlock if lock_key == dist_lock.current_lock_key end |
.num_pending_events ⇒ Object
57 58 59 |
# File 'lib/3scale/backend/analytics/kinesis/exporter.rb', line 57 def num_pending_events kinesis_adapter.num_pending_events end |
.schedule_job ⇒ Object
36 37 38 39 40 41 42 43 |
# File 'lib/3scale/backend/analytics/kinesis/exporter.rb', line 36 def schedule_job if enabled? lock_key = dist_lock.lock if lock_key Resque.enqueue(Job, Time.now.utc, lock_key, Time.now.utc.to_f) end end end |