Class: ThreeScale::Backend::Analytics::Kinesis::Adapter

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/3scale/backend/analytics/kinesis/adapter.rb

Instance Method Summary collapse

Methods included from Logging

enable!, included

Constructor Details

#initialize(stream_name, kinesis_client, storage) ⇒ Adapter

Returns a new instance of Adapter.



57
58
59
60
61
# File 'lib/3scale/backend/analytics/kinesis/adapter.rb', line 57

def initialize(stream_name, kinesis_client, storage)
  @stream_name = stream_name
  @kinesis_client = kinesis_client
  @storage = storage
end

Instance Method Details

#flush(limit = nil) ⇒ Object

Sends the pending events to Kinesis, even if there are not enough of them to fill 1 record. Returns the number of events correctly sent to Kinesis



85
86
87
88
89
90
91
# File 'lib/3scale/backend/analytics/kinesis/adapter.rb', line 85

def flush(limit = nil)
  pending_events = stored_pending_events
  events_to_flush = limit ? pending_events.take(limit) : pending_events
  failed_events = send_events_in_batches(events_to_flush)
  store_pending_events(pending_events - events_to_flush + failed_events)
  events_to_flush.size - failed_events.size
end

#num_pending_eventsObject



93
94
95
# File 'lib/3scale/backend/analytics/kinesis/adapter.rb', line 93

def num_pending_events
  storage.scard(KINESIS_PENDING_EVENTS_KEY)
end

#send_events(events) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/3scale/backend/analytics/kinesis/adapter.rb', line 63

def send_events(events)
  pending_events = stored_pending_events + events

  # Only disable indicating emergency if bucket storage is enabled.
  # We do not want to indicate emergency if it was disabled manually.
  if limit_pending_events_reached?(pending_events.size) && Stats::Storage.enabled?
    Stats::Storage.disable!(true)
    log_bucket_creation_disabled
  end

  # Batch events until we can fill at least one record
  if pending_events.size >= EVENTS_PER_RECORD
    failed_events = send_events_in_batches(pending_events)
    store_pending_events(failed_events)
  else
    store_pending_events(pending_events)
  end
end