Class: AmplitudeAnalytics::InMemoryStorage

Inherits:
Storage
  • Object
show all
Defined in:
lib/amplitude/storage.rb

Overview

InMemoryStorage class

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeInMemoryStorage

Returns a new instance of InMemoryStorage.



29
30
31
32
33
34
35
36
37
38
# File 'lib/amplitude/storage.rb', line 29

def initialize
  super
  @total_events = 0
  @buffer_data = []
  @ready_queue = []
  @monitor = Monitor.new
  @buffer_lock_cv = @monitor.new_cond
  @configuration = nil
  @workers = nil
end

Instance Attribute Details

#buffer_dataObject (readonly)

Returns the value of attribute buffer_data.



27
28
29
# File 'lib/amplitude/storage.rb', line 27

def buffer_data
  @buffer_data
end

#monitorObject (readonly)

Returns the value of attribute monitor.



27
28
29
# File 'lib/amplitude/storage.rb', line 27

def monitor
  @monitor
end

#ready_queueObject (readonly)

Returns the value of attribute ready_queue.



27
28
29
# File 'lib/amplitude/storage.rb', line 27

def ready_queue
  @ready_queue
end

#total_eventsObject (readonly)

Returns the value of attribute total_events.



27
28
29
# File 'lib/amplitude/storage.rb', line 27

def total_events
  @total_events
end

#workersObject (readonly)

Returns the value of attribute workers.



27
28
29
# File 'lib/amplitude/storage.rb', line 27

def workers
  @workers
end

Instance Method Details

#insert_event(total_delay, event) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/amplitude/storage.rb', line 101

def insert_event(total_delay, event)
  current_time = AmplitudeAnalytics.current_milliseconds
  @monitor.synchronize do
    @ready_queue << @buffer_data.shift[1] while @buffer_data.any? && @buffer_data[0][0] <= current_time

    if total_delay == 0
      @ready_queue << event
    else
      time_stamp = current_time + total_delay
      left = 0
      right = @buffer_data.length - 1
      while left <= right
        mid = (left + right) / 2
        if @buffer_data[mid][0] > time_stamp
          right = mid - 1
        else
          left = mid + 1
        end
      end
      @buffer_data.insert(left, [time_stamp, event])
    end

    @total_events += 1

    lock.signal if @ready_queue.length >= @configuration.flush_queue_size
  end
end

#lockObject



40
41
42
# File 'lib/amplitude/storage.rb', line 40

def lock
  @buffer_lock_cv
end

#max_retryObject



44
45
46
# File 'lib/amplitude/storage.rb', line 44

def max_retry
  @configuration.flush_max_retries
end

#pull(batch_size) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/amplitude/storage.rb', line 74

def pull(batch_size)
  current_time = AmplitudeAnalytics.current_milliseconds
  @monitor.synchronize do
    result = @ready_queue.shift(batch_size)
    index = 0
    while index < @buffer_data.length && index < batch_size - result.length &&
          current_time >= @buffer_data[index][0]
      event = @buffer_data[index][1]
      result << event
      index += 1
    end
    @buffer_data.slice!(0, index)
    @total_events -= result.length
    result
  end
end

#pull_allObject



91
92
93
94
95
96
97
98
99
# File 'lib/amplitude/storage.rb', line 91

def pull_all
  @monitor.synchronize do
    result = @ready_queue + @buffer_data.map { |element| element[1] }
    @buffer_data.clear
    @ready_queue.clear
    @total_events = 0
    result
  end
end

#push(event, delay = 0) ⇒ Object



63
64
65
66
67
68
69
70
71
72
# File 'lib/amplitude/storage.rb', line 63

def push(event, delay = 0)
  return false, 'Destination buffer full. Retry temporarily disabled' if event.retry && @total_events >= MAX_BUFFER_CAPACITY

  return false, "Event reached max retry times #{max_retry}." if event.retry >= max_retry

  total_delay = delay + retry_delay(event.retry)
  insert_event(total_delay, event)
  @workers.start
  [true, nil]
end

#retry_delay(ret) ⇒ Object



129
130
131
132
133
134
135
136
137
# File 'lib/amplitude/storage.rb', line 129

def retry_delay(ret)
  if ret > max_retry
    3200
  elsif ret <= 0
    0
  else
    100 * (2**((ret - 1) / 2))
  end
end

#setup(configuration, workers) ⇒ Object



58
59
60
61
# File 'lib/amplitude/storage.rb', line 58

def setup(configuration, workers)
  @configuration = configuration
  @workers = workers
end

#wait_timeObject



48
49
50
51
52
53
54
55
56
# File 'lib/amplitude/storage.rb', line 48

def wait_time
  if @ready_queue.any?
    0
  elsif @buffer_data.any?
    [@buffer_data[0][0] - AmplitudeAnalytics.current_milliseconds, @configuration.flush_interval_millis].min
  else
    @configuration.flush_interval_millis
  end
end