Class: ThreeScale::Backend::Analytics::Kinesis::Job
- Inherits:
-
BackgroundJob
- Object
- BackgroundJob
- ThreeScale::Backend::Analytics::Kinesis::Job
- Extended by:
- Logging
- Defined in:
- lib/3scale/backend/analytics/kinesis/job.rb
Overview
This job works as follows:
1) Reads the pending events from the buckets that have not been read.
2) Parses and filters those events.
3) Sends the events to the Kinesis adapter.
4) Updates the latest bucket read, to avoid processing buckets more
than once.
The events are sent in batches to Kinesis, but the component that does that batching is the Kinesis adapter.
Before sending the events to Kinesis, we attach a ‘time_gen’ attribute to each of them. This is a timestamp that indicates approximately when the event was generated based on the bucket where it was stored. We need this attribute because we will have repeated event keys in Redis and we will need to know which one contains the most updated value. Notice that we do not send all the events that are in the buckets to Kinesis. This job reads several buckets each time it runs. Some events can be repeated across those buckets. However, the job will only send to Kinesis the latest value (the one in the most recent bucket). This reduces the information that we need to parse, filter, and send. We need the extra field ‘time_gen’, because we cannot safely assume any order in S3 when sending events to Kinesis.
Constant Summary
Constants inherited from BackgroundJob
BackgroundJob::EMPTY_HOOKS, BackgroundJob::Error
Class Method Summary collapse
Methods included from Logging
Methods inherited from BackgroundJob
Methods included from Configurable
#configuration, #configuration=, included
Class Method Details
.perform_logged(end_time_utc, lock_key, _enqueue_time) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/3scale/backend/analytics/kinesis/job.rb', line 50 def perform_logged(end_time_utc, lock_key, _enqueue_time) # end_time_utc will be a string when the worker processes this job. # The parameter is passed through Redis as a string. We need to # convert it back. events_sent = 0 end_time = DateTime.parse(end_time_utc).to_time.utc pending_events = bucket_reader.pending_events_in_buckets( end_time_utc: end_time, max_buckets: MAX_BUCKETS) unless pending_events[:events].empty? events = prepare_events(pending_events[:latest_bucket], pending_events[:events]) kinesis_adapter.send_events(events) bucket_reader.latest_bucket_read = pending_events[:latest_bucket] events_sent = events.size # We might use a different strategy to delete buckets in the # future, but for now, we are going to delete the buckets as they # are read bucket_storage.delete_range(pending_events[:latest_bucket]) end Exporter.job_finished(lock_key) [true, msg_events_sent(events_sent)] end |