Class: Yabeda::Datadog::Worker
- Inherits:
-
Object
- Object
- Yabeda::Datadog::Worker
- Defined in:
- lib/yabeda/datadog/worker.rb,
lib/yabeda/datadog/worker/send.rb,
lib/yabeda/datadog/worker/register.rb
Overview
Perform actions async
Constant Summary collapse
- SEND =
proc do |accumulated_payload| begin dogstatsd = ::Datadog::Statsd.new( Yabeda::Datadog.config.agent_host, Yabeda::Datadog.config.agent_port, single_thread: true, ) Logging.instance.debug("sending batch of #{accumulated_payload.size} metrics") begin dogstatsd.batch do |stats| accumulated_payload.each do |payload| metric = payload.fetch(:metric) value = payload.fetch(:value) = payload.fetch(:tags) stats.send(metric.type, metric.name, value, tags: ) end end rescue StandardError => err Logging.instance.error("metric sending failed: #{err.}") end ensure dogstatsd.close end end
- REGISTER =
proc do |accumulated_payload| dogapi = ::Dogapi::Client.new(Yabeda::Datadog.config.api_key, Yabeda::Datadog.config.app_key) accumulated_payload.each do |payload| metric = payload.fetch(:metric) begin ResponseHandler.call(metric) do metric.update(dogapi) end rescue StandardError => err Logging.instance.error("metric registration failed for #{metric.name}: #{err.}") end end end
Class Method Summary collapse
Instance Method Summary collapse
- #enqueue(action, payload) ⇒ Object
-
#initialize(queue) ⇒ Worker
constructor
A new instance of Worker.
- #spawn_threads(num_threads) ⇒ Object
- #spawned_threads_count ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(queue) ⇒ Worker
Returns a new instance of Worker.
21 22 23 24 |
# File 'lib/yabeda/datadog/worker.rb', line 21 def initialize(queue) @queue = queue @threads = [] end |
Class Method Details
.start(config) ⇒ Object
10 11 12 13 14 15 16 17 18 19 |
# File 'lib/yabeda/datadog/worker.rb', line 10 def self.start(config) queue_size = config.queue_size num_threads = config.num_threads Logging.instance.info("start worker; queue size: #{queue_size}; threads #{num_threads} ") queue = SizedQueue.new(queue_size) instance = new(queue) instance.spawn_threads(num_threads) instance end |
Instance Method Details
#enqueue(action, payload) ⇒ Object
26 27 28 |
# File 'lib/yabeda/datadog/worker.rb', line 26 def enqueue(action, payload) queue.push([action, payload]) end |
#spawn_threads(num_threads) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/yabeda/datadog/worker.rb', line 30 def spawn_threads(num_threads) num_threads.times do threads << Thread.new do grouped_actions = Hash.new { |hash, key| hash[key] = [] } while running? || actions_left? batch_size = 0 # wait for actions, blocks the current thread action_key, action_payload = wait_for_action if action_key grouped_actions[action_key].push(action_payload) batch_size += 1 end # group a batch of actions while batch_size < Yabeda::Datadog.config.batch_size begin action_key, action_payload = dequeue_action grouped_actions[action_key].push(action_payload) batch_size += 1 rescue ThreadError break # exit batch loop if we drain the queue end end # invoke actions in batches grouped_actions.each_pair do |group_key, group_payload| self.class.const_get(group_key, false).call(group_payload) end grouped_actions.clear end end end true end |
#spawned_threads_count ⇒ Object
68 69 70 |
# File 'lib/yabeda/datadog/worker.rb', line 68 def spawned_threads_count threads.size end |
#stop ⇒ Object
72 73 74 75 76 77 78 |
# File 'lib/yabeda/datadog/worker.rb', line 72 def stop Logging.instance.info("stop worker") queue.close threads.each(&:exit) threads.clear true end |