Class: Datadog::DI::ProbeNotifierWorker Private
- Inherits:
-
Object
- Object
- Datadog::DI::ProbeNotifierWorker
- Defined in:
- lib/datadog/di/probe_notifier_worker.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Background worker thread for sending probe statuses and snapshots to the backend (via the agent).
The loop inside the worker rescues all exceptions to prevent termination due to unhandled exceptions raised by any downstream code. This includes communication and protocol errors when sending the payloads to the agent.
The worker groups the data to send into batches. The goal is to perform no more than one network operation per event type per second. There is also a limit on the length of the sending queue to prevent it from growing without bounds if upstream code generates an enormous number of events for some reason.
Wake-up events are used (via ConditionVariable) to keep the thread asleep if there is no work to be done.
Instance Attribute Summary collapse
- #logger ⇒ Object readonly private
- #settings ⇒ Object readonly private
- #telemetry ⇒ Object readonly private
Instance Method Summary collapse
-
#flush ⇒ Object
private
Waits for background thread to send pending notifications.
-
#initialize(settings, transport, logger, telemetry: nil) ⇒ ProbeNotifierWorker
constructor
private
A new instance of ProbeNotifierWorker.
- #start ⇒ Object private
-
#stop(timeout = 1) ⇒ Object
private
Stops the background thread.
Constructor Details
#initialize(settings, transport, logger, telemetry: nil) ⇒ ProbeNotifierWorker
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of ProbeNotifierWorker.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 26 def initialize(settings, transport, logger, telemetry: nil) @settings = settings @telemetry = telemetry @status_queue = [] @snapshot_queue = [] @transport = transport @logger = logger @lock = Mutex.new @wake = Core::Semaphore.new @io_in_progress = false @sleep_remaining = nil @wake_scheduled = false @thread = nil @flush = 0 end |
Instance Attribute Details
#logger ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
43 44 45 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 43 def logger @logger end |
#settings ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
42 43 44 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 42 def settings @settings end |
#telemetry ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
44 45 46 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 44 def telemetry @telemetry end |
Instance Method Details
#flush ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Waits for background thread to send pending notifications.
This method waits for the notification queue to become empty rather than for a particular set of notifications to be sent out, therefore, it should only be called when there is no parallel activity (in another thread) that causes more notifications to be generated.
This method is used by the test suite to wait until notifications have been sent out, and could be used for graceful stopping of the worker thread.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 117 def flush @lock.synchronize do @flush += 1 end begin loop do if @thread.nil? || !@thread.alive? return end io_in_progress, queues_empty = @lock.synchronize do [io_in_progress?, status_queue.empty? && snapshot_queue.empty?] end if io_in_progress # If we just call Thread.pass we could be in a busy loop - # add a sleep. sleep 0.25 next elsif queues_empty break else wake.signal sleep 0.25 next end end ensure @lock.synchronize do @flush -= 1 end end end |
#start ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 46 def start return if @thread @thread = Thread.new do loop do # TODO If stop is requested, we stop immediately without # flushing the submissions. Should we send pending submissions # and then quit? break if @stop_requested # If a flush was requested, send immediately and do not # wait for the cooldown period. if @lock.synchronize { @flush } == 0 sleep_remaining = @lock.synchronize do if sleep_remaining && sleep_remaining > 0 # Recalculate how much sleep time is remaining, then sleep that long. set_sleep_remaining else 0 end end if sleep_remaining > 0 # Do not need to update @wake_scheduled here because # wake-up is already scheduled for the earliest possible time. wake.wait(sleep_remaining) next end end begin more = maybe_send rescue => exc raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions logger.debug { "di: error in probe notifier worker: #{exc.class}: #{exc} (at #{exc.backtrace.first})" } telemetry&.report(exc, description: "Error in probe notifier worker") end @lock.synchronize do @wake_scheduled = more end wake.wait(more ? min_send_interval : nil) end end end |
#stop(timeout = 1) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Stops the background thread.
Attempts a graceful stop with the specified timeout, then falls back to killing the thread using Thread#kill.
95 96 97 98 99 100 101 102 103 104 |
# File 'lib/datadog/di/probe_notifier_worker.rb', line 95 def stop(timeout = 1) @stop_requested = true wake.signal if thread unless thread.join(timeout) thread.kill end @thread = nil end end |