Class: AmplitudeAnalytics::Workers

Inherits:
Object
  • Object
show all
Defined in:
lib/amplitude/workers.rb

Overview

Workers

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeWorkers



9
10
11
12
13
14
15
16
17
18
# File 'lib/amplitude/workers.rb', line 9

def initialize
  @threads_pool = Concurrent::ThreadPoolExecutor.new(max_threads: 16)
  @is_active = true
  @consumer_lock = Mutex.new
  @is_started = false
  @configuration = nil
  @storage = nil
  @response_processor = ResponseProcessor.new
  @http_client = HttpClient.new
end

Instance Attribute Details

#configurationObject (readonly)

Returns the value of attribute configuration.



7
8
9
# File 'lib/amplitude/workers.rb', line 7

def configuration
  @configuration
end

#consumer_lockObject (readonly)

Returns the value of attribute consumer_lock.



7
8
9
# File 'lib/amplitude/workers.rb', line 7

def consumer_lock
  @consumer_lock
end

#http_clientObject (readonly)

Returns the value of attribute http_client.



7
8
9
# File 'lib/amplitude/workers.rb', line 7

def http_client
  @http_client
end

#is_activeObject (readonly)

Returns the value of attribute is_active.



7
8
9
# File 'lib/amplitude/workers.rb', line 7

def is_active
  @is_active
end

#is_startedObject (readonly)

Returns the value of attribute is_started.



7
8
9
# File 'lib/amplitude/workers.rb', line 7

def is_started
  @is_started
end

#response_processorObject (readonly)

Returns the value of attribute response_processor.



7
8
9
# File 'lib/amplitude/workers.rb', line 7

def response_processor
  @response_processor
end

#storageObject (readonly)

Returns the value of attribute storage.



7
8
9
# File 'lib/amplitude/workers.rb', line 7

def storage
  @storage
end

#threads_poolObject (readonly)

Returns the value of attribute threads_pool.



7
8
9
# File 'lib/amplitude/workers.rb', line 7

def threads_pool
  @threads_pool
end

Instance Method Details

#buffer_consumerObject



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/amplitude/workers.rb', line 75

def buffer_consumer
  if @is_active
    @storage.monitor.synchronize do
      @storage.lock.wait(@configuration.flush_interval_millis.to_f / 1000)

      loop do
        break unless @storage.total_events.positive?

        events = @storage.pull(@configuration.flush_queue_size)
        if events
          @threads_pool.post { send(events) }
        else
          wait_time = @storage.wait_time.to_f / 1000
          @storage.lock.wait(wait_time) if wait_time > 0
        end
      end
    end
  end
rescue StandardError => e
  @configuration.logger.error("Consumer thread error: #{e}")
ensure
  @consumer_lock.synchronize do
    @is_started = false
  end
end

#flushObject



43
44
45
46
47
48
# File 'lib/amplitude/workers.rb', line 43

def flush
  events = @storage.pull_all unless @storage.nil?
  Concurrent::Future.execute do
    send(events) if events && !events.empty?
  end
end

#get_payload(events) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/amplitude/workers.rb', line 61

def get_payload(events)
  payload_body = {
    'api_key' => @configuration.api_key,
    'events' => []
  }

  events.each do |event|
    event_body = event.event_body
    payload_body['events'] << event_body if event_body
  end
  payload_body['options'] = @configuration.options if @configuration.options
  JSON.dump(payload_body).encode('utf-8')
end

#send(events) ⇒ Object



50
51
52
53
54
55
56
57
58
59
# File 'lib/amplitude/workers.rb', line 50

def send(events)
  url = @configuration.server_url
  payload = get_payload(events)
  res = @http_client.post(url, payload)
  begin
    @response_processor.process_response(res, events)
  rescue InvalidAPIKeyError
    @configuration.logger.error('Invalid API Key')
  end
end

#setup(configuration, storage) ⇒ Object



20
21
22
23
24
25
# File 'lib/amplitude/workers.rb', line 20

def setup(configuration, storage)
  @configuration = configuration
  @storage = storage
  @response_processor = ResponseProcessor.new
  @response_processor.setup(configuration, storage)
end

#startObject



27
28
29
30
31
32
33
34
# File 'lib/amplitude/workers.rb', line 27

def start
  @consumer_lock.synchronize do
    unless @is_started
      @is_started = true
      Thread.new { buffer_consumer }
    end
  end
end

#stopObject



36
37
38
39
40
41
# File 'lib/amplitude/workers.rb', line 36

def stop
  flush
  @is_active = false
  @is_started = true
  @threads_pool.shutdown
end