Class: AmplitudeAnalytics::Workers
- Inherits:
-
Object
- Object
- AmplitudeAnalytics::Workers
- Defined in:
- lib/amplitude/workers.rb
Overview
Workers
Instance Attribute Summary collapse
-
#configuration ⇒ Object
readonly
Returns the value of attribute configuration.
-
#consumer_lock ⇒ Object
readonly
Returns the value of attribute consumer_lock.
-
#http_client ⇒ Object
readonly
Returns the value of attribute http_client.
-
#is_active ⇒ Object
readonly
Returns the value of attribute is_active.
-
#is_started ⇒ Object
readonly
Returns the value of attribute is_started.
-
#response_processor ⇒ Object
readonly
Returns the value of attribute response_processor.
-
#storage ⇒ Object
readonly
Returns the value of attribute storage.
-
#threads_pool ⇒ Object
readonly
Returns the value of attribute threads_pool.
Instance Method Summary collapse
- #buffer_consumer ⇒ Object
- #flush ⇒ Object
- #get_payload(events) ⇒ Object
-
#initialize ⇒ Workers
constructor
A new instance of Workers.
- #send(events) ⇒ Object
- #setup(configuration, storage) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize ⇒ Workers
9 10 11 12 13 14 15 16 17 18 |
# File 'lib/amplitude/workers.rb', line 9 def initialize @threads_pool = Concurrent::ThreadPoolExecutor.new(max_threads: 16) @is_active = true @consumer_lock = Mutex.new @is_started = false @configuration = nil @storage = nil @response_processor = ResponseProcessor.new @http_client = HttpClient.new end |
Instance Attribute Details
#configuration ⇒ Object (readonly)
Returns the value of attribute configuration.
7 8 9 |
# File 'lib/amplitude/workers.rb', line 7 def configuration @configuration end |
#consumer_lock ⇒ Object (readonly)
Returns the value of attribute consumer_lock.
7 8 9 |
# File 'lib/amplitude/workers.rb', line 7 def consumer_lock @consumer_lock end |
#http_client ⇒ Object (readonly)
Returns the value of attribute http_client.
7 8 9 |
# File 'lib/amplitude/workers.rb', line 7 def http_client @http_client end |
#is_active ⇒ Object (readonly)
Returns the value of attribute is_active.
7 8 9 |
# File 'lib/amplitude/workers.rb', line 7 def is_active @is_active end |
#is_started ⇒ Object (readonly)
Returns the value of attribute is_started.
7 8 9 |
# File 'lib/amplitude/workers.rb', line 7 def is_started @is_started end |
#response_processor ⇒ Object (readonly)
Returns the value of attribute response_processor.
7 8 9 |
# File 'lib/amplitude/workers.rb', line 7 def response_processor @response_processor end |
#storage ⇒ Object (readonly)
Returns the value of attribute storage.
7 8 9 |
# File 'lib/amplitude/workers.rb', line 7 def storage @storage end |
#threads_pool ⇒ Object (readonly)
Returns the value of attribute threads_pool.
7 8 9 |
# File 'lib/amplitude/workers.rb', line 7 def threads_pool @threads_pool end |
Instance Method Details
#buffer_consumer ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/amplitude/workers.rb', line 75 def buffer_consumer if @is_active @storage.monitor.synchronize do @storage.lock.wait(@configuration.flush_interval_millis.to_f / 1000) loop do break unless @storage.total_events.positive? events = @storage.pull(@configuration.flush_queue_size) if events @threads_pool.post { send(events) } else wait_time = @storage.wait_time.to_f / 1000 @storage.lock.wait(wait_time) if wait_time > 0 end end end end rescue StandardError => e @configuration.logger.error("Consumer thread error: #{e}") ensure @consumer_lock.synchronize do @is_started = false end end |
#flush ⇒ Object
43 44 45 46 47 48 |
# File 'lib/amplitude/workers.rb', line 43 def flush events = @storage.pull_all unless @storage.nil? Concurrent::Future.execute do send(events) if events && !events.empty? end end |
#get_payload(events) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/amplitude/workers.rb', line 61 def get_payload(events) payload_body = { 'api_key' => @configuration.api_key, 'events' => [] } events.each do |event| event_body = event.event_body payload_body['events'] << event_body if event_body end payload_body['options'] = @configuration. if @configuration. JSON.dump(payload_body).encode('utf-8') end |
#send(events) ⇒ Object
50 51 52 53 54 55 56 57 58 59 |
# File 'lib/amplitude/workers.rb', line 50 def send(events) url = @configuration.server_url payload = get_payload(events) res = @http_client.post(url, payload) begin @response_processor.process_response(res, events) rescue InvalidAPIKeyError @configuration.logger.error('Invalid API Key') end end |
#setup(configuration, storage) ⇒ Object
20 21 22 23 24 25 |
# File 'lib/amplitude/workers.rb', line 20 def setup(configuration, storage) @configuration = configuration @storage = storage @response_processor = ResponseProcessor.new @response_processor.setup(configuration, storage) end |
#start ⇒ Object
27 28 29 30 31 32 33 34 |
# File 'lib/amplitude/workers.rb', line 27 def start @consumer_lock.synchronize do unless @is_started @is_started = true Thread.new { buffer_consumer } end end end |
#stop ⇒ Object
36 37 38 39 40 41 |
# File 'lib/amplitude/workers.rb', line 36 def stop flush @is_active = false @is_started = true @threads_pool.shutdown end |