Class: AmplitudeAnalytics::InMemoryStorage
- Defined in:
- lib/amplitude/storage.rb
Overview
InMemoryStorage class
Instance Attribute Summary collapse
-
#buffer_data ⇒ Object
readonly
Returns the value of attribute buffer_data.
-
#monitor ⇒ Object
readonly
Returns the value of attribute monitor.
-
#ready_queue ⇒ Object
readonly
Returns the value of attribute ready_queue.
-
#total_events ⇒ Object
readonly
Returns the value of attribute total_events.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Instance Method Summary collapse
-
#initialize ⇒ InMemoryStorage
constructor
A new instance of InMemoryStorage.
- #insert_event(total_delay, event) ⇒ Object
- #lock ⇒ Object
- #max_retry ⇒ Object
- #pull(batch_size) ⇒ Object
- #pull_all ⇒ Object
- #push(event, delay = 0) ⇒ Object
- #retry_delay(ret) ⇒ Object
- #setup(configuration, workers) ⇒ Object
- #wait_time ⇒ Object
Constructor Details
#initialize ⇒ InMemoryStorage
Returns a new instance of InMemoryStorage.
29 30 31 32 33 34 35 36 37 38 |
# File 'lib/amplitude/storage.rb', line 29 def initialize super @total_events = 0 @buffer_data = [] @ready_queue = [] @monitor = Monitor.new @buffer_lock_cv = @monitor.new_cond @configuration = nil @workers = nil end |
Instance Attribute Details
#buffer_data ⇒ Object (readonly)
Returns the value of attribute buffer_data.
27 28 29 |
# File 'lib/amplitude/storage.rb', line 27 def buffer_data @buffer_data end |
#monitor ⇒ Object (readonly)
Returns the value of attribute monitor.
27 28 29 |
# File 'lib/amplitude/storage.rb', line 27 def monitor @monitor end |
#ready_queue ⇒ Object (readonly)
Returns the value of attribute ready_queue.
27 28 29 |
# File 'lib/amplitude/storage.rb', line 27 def ready_queue @ready_queue end |
#total_events ⇒ Object (readonly)
Returns the value of attribute total_events.
27 28 29 |
# File 'lib/amplitude/storage.rb', line 27 def total_events @total_events end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
27 28 29 |
# File 'lib/amplitude/storage.rb', line 27 def workers @workers end |
Instance Method Details
#insert_event(total_delay, event) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/amplitude/storage.rb', line 101 def insert_event(total_delay, event) current_time = AmplitudeAnalytics.current_milliseconds @monitor.synchronize do @ready_queue << @buffer_data.shift[1] while @buffer_data.any? && @buffer_data[0][0] <= current_time if total_delay == 0 @ready_queue << event else time_stamp = current_time + total_delay left = 0 right = @buffer_data.length - 1 while left <= right mid = (left + right) / 2 if @buffer_data[mid][0] > time_stamp right = mid - 1 else left = mid + 1 end end @buffer_data.insert(left, [time_stamp, event]) end @total_events += 1 lock.signal if @ready_queue.length >= @configuration.flush_queue_size end end |
#lock ⇒ Object
40 41 42 |
# File 'lib/amplitude/storage.rb', line 40 def lock @buffer_lock_cv end |
#max_retry ⇒ Object
44 45 46 |
# File 'lib/amplitude/storage.rb', line 44 def max_retry @configuration.flush_max_retries end |
#pull(batch_size) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/amplitude/storage.rb', line 74 def pull(batch_size) current_time = AmplitudeAnalytics.current_milliseconds @monitor.synchronize do result = @ready_queue.shift(batch_size) index = 0 while index < @buffer_data.length && index < batch_size - result.length && current_time >= @buffer_data[index][0] event = @buffer_data[index][1] result << event index += 1 end @buffer_data.slice!(0, index) @total_events -= result.length result end end |
#pull_all ⇒ Object
91 92 93 94 95 96 97 98 99 |
# File 'lib/amplitude/storage.rb', line 91 def pull_all @monitor.synchronize do result = @ready_queue + @buffer_data.map { |element| element[1] } @buffer_data.clear @ready_queue.clear @total_events = 0 result end end |
#push(event, delay = 0) ⇒ Object
63 64 65 66 67 68 69 70 71 72 |
# File 'lib/amplitude/storage.rb', line 63 def push(event, delay = 0) return false, 'Destination buffer full. Retry temporarily disabled' if event.retry && @total_events >= MAX_BUFFER_CAPACITY return false, "Event reached max retry times #{max_retry}." if event.retry >= max_retry total_delay = delay + retry_delay(event.retry) insert_event(total_delay, event) @workers.start [true, nil] end |
#retry_delay(ret) ⇒ Object
129 130 131 132 133 134 135 136 137 |
# File 'lib/amplitude/storage.rb', line 129 def retry_delay(ret) if ret > max_retry 3200 elsif ret <= 0 0 else 100 * (2**((ret - 1) / 2)) end end |
#setup(configuration, workers) ⇒ Object
58 59 60 61 |
# File 'lib/amplitude/storage.rb', line 58 def setup(configuration, workers) @configuration = configuration @workers = workers end |
#wait_time ⇒ Object
48 49 50 51 52 53 54 55 56 |
# File 'lib/amplitude/storage.rb', line 48 def wait_time if @ready_queue.any? 0 elsif @buffer_data.any? [@buffer_data[0][0] - AmplitudeAnalytics.current_milliseconds, @configuration.flush_interval_millis].min else @configuration.flush_interval_millis end end |