Class: ThreeScale::Backend::Analytics::Kinesis::Adapter
- Inherits:
-
Object
- Object
- ThreeScale::Backend::Analytics::Kinesis::Adapter
- Includes:
- Logging
- Defined in:
- lib/3scale/backend/analytics/kinesis/adapter.rb
Instance Method Summary collapse
-
#flush(limit = nil) ⇒ Object
Sends the pending events to Kinesis, even if there are not enough of them to fill 1 record.
-
#initialize(stream_name, kinesis_client, storage) ⇒ Adapter
constructor
A new instance of Adapter.
- #num_pending_events ⇒ Object
- #send_events(events) ⇒ Object
Methods included from Logging
Constructor Details
#initialize(stream_name, kinesis_client, storage) ⇒ Adapter
Returns a new instance of Adapter.
57 58 59 60 61 |
# File 'lib/3scale/backend/analytics/kinesis/adapter.rb', line 57 def initialize(stream_name, kinesis_client, storage) @stream_name = stream_name @kinesis_client = kinesis_client @storage = storage end |
Instance Method Details
#flush(limit = nil) ⇒ Object
Sends the pending events to Kinesis, even if there are not enough of them to fill 1 record. Returns the number of events correctly sent to Kinesis
85 86 87 88 89 90 91 |
# File 'lib/3scale/backend/analytics/kinesis/adapter.rb', line 85 def flush(limit = nil) pending_events = stored_pending_events events_to_flush = limit ? pending_events.take(limit) : pending_events failed_events = send_events_in_batches(events_to_flush) store_pending_events(pending_events - events_to_flush + failed_events) events_to_flush.size - failed_events.size end |
#num_pending_events ⇒ Object
93 94 95 |
# File 'lib/3scale/backend/analytics/kinesis/adapter.rb', line 93 def num_pending_events storage.scard(KINESIS_PENDING_EVENTS_KEY) end |
#send_events(events) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/3scale/backend/analytics/kinesis/adapter.rb', line 63 def send_events(events) pending_events = stored_pending_events + events # Only disable indicating emergency if bucket storage is enabled. # We do not want to indicate emergency if it was disabled manually. if limit_pending_events_reached?(pending_events.size) && Stats::Storage.enabled? Stats::Storage.disable!(true) log_bucket_creation_disabled end # Batch events until we can fill at least one record if pending_events.size >= EVENTS_PER_RECORD failed_events = send_events_in_batches(pending_events) store_pending_events(failed_events) else store_pending_events(pending_events) end end |