Class: ThreeScale::Backend::Analytics::Kinesis::Job

Inherits:
BackgroundJob
  • Object
show all
Extended by:
Logging
Defined in:
lib/3scale/backend/analytics/kinesis/job.rb

Overview

This job works as follows:

1) Reads the pending events from the buckets that have not been read.
2) Parses and filters those events.
3) Sends the events to the Kinesis adapter.
4) Updates the latest bucket read, to avoid processing buckets more
   than once.

The events are sent in batches to Kinesis, but the component that does that batching is the Kinesis adapter.

Before sending the events to Kinesis, we attach a ‘time_gen’ attribute to each of them. This is a timestamp that indicates approximately when the event was generated based on the bucket where it was stored. We need this attribute because we will have repeated event keys in Redis and we will need to know which one contains the most updated value. Notice that we do not send all the events that are in the buckets to Kinesis. This job reads several buckets each time it runs. Some events can be repeated across those buckets. However, the job will only send to Kinesis the latest value (the one in the most recent bucket). This reduces the information that we need to parse, filter, and send. We need the extra field ‘time_gen’, because we cannot safely assume any order in S3 when sending events to Kinesis.

Constant Summary

Constants inherited from BackgroundJob

BackgroundJob::EMPTY_HOOKS, BackgroundJob::Error

Class Method Summary collapse

Methods included from Logging

enable!, included

Methods inherited from BackgroundJob

hooks, perform

Methods included from Configurable

#configuration, #configuration=, included

Class Method Details

.perform_logged(end_time_utc, lock_key, _enqueue_time) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/3scale/backend/analytics/kinesis/job.rb', line 50

def perform_logged(end_time_utc, lock_key, _enqueue_time)
  # end_time_utc will be a string when the worker processes this job.
  # The parameter is passed through Redis as a string. We need to
  # convert it back.
  events_sent = 0

  end_time = DateTime.parse(end_time_utc).to_time.utc
  pending_events = bucket_reader.pending_events_in_buckets(
    end_time_utc: end_time, max_buckets: MAX_BUCKETS)

  unless pending_events[:events].empty?
    events = prepare_events(pending_events[:latest_bucket],
                            pending_events[:events])
    kinesis_adapter.send_events(events)
    bucket_reader.latest_bucket_read = pending_events[:latest_bucket]
    events_sent = events.size

    # We might use a different strategy to delete buckets in the
    # future, but for now, we are going to delete the buckets as they
    # are read
    bucket_storage.delete_range(pending_events[:latest_bucket])
  end

  Exporter.job_finished(lock_key)
  [true, msg_events_sent(events_sent)]
end