Class: Datadog::DI::ProbeNotifierWorker Private

Inherits:
Object
  • Object
show all
Defined in:
lib/datadog/di/probe_notifier_worker.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Background worker thread for sending probe statuses and snapshots to the backend (via the agent).

The loop inside the worker rescues all exceptions to prevent termination due to unhandled exceptions raised by any downstream code. This includes communication and protocol errors when sending the payloads to the agent.

The worker groups the data to send into batches. The goal is to perform no more than one network operation per event type per second. There is also a limit on the length of the sending queue to prevent it from growing without bounds if upstream code generates an enormous number of events for some reason.

Wake-up events are used (via ConditionVariable) to keep the thread asleep if there is no work to be done.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(settings, transport, logger, telemetry: nil) ⇒ ProbeNotifierWorker

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of ProbeNotifierWorker.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/datadog/di/probe_notifier_worker.rb', line 26

def initialize(settings, transport, logger, telemetry: nil)
  @settings = settings
  @telemetry = telemetry
  @status_queue = []
  @snapshot_queue = []
  @transport = transport
  @logger = logger
  @lock = Mutex.new
  @wake = Core::Semaphore.new
  @io_in_progress = false
  @sleep_remaining = nil
  @wake_scheduled = false
  @thread = nil
  @flush = 0
end

Instance Attribute Details

#loggerObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



43
44
45
# File 'lib/datadog/di/probe_notifier_worker.rb', line 43

def logger
  @logger
end

#settingsObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



42
43
44
# File 'lib/datadog/di/probe_notifier_worker.rb', line 42

def settings
  @settings
end

#telemetryObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



44
45
46
# File 'lib/datadog/di/probe_notifier_worker.rb', line 44

def telemetry
  @telemetry
end

Instance Method Details

#flushObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Waits for background thread to send pending notifications.

This method waits for the notification queue to become empty rather than for a particular set of notifications to be sent out, therefore, it should only be called when there is no parallel activity (in another thread) that causes more notifications to be generated.

This method is used by the test suite to wait until notifications have been sent out, and could be used for graceful stopping of the worker thread.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/datadog/di/probe_notifier_worker.rb', line 117

def flush
  @lock.synchronize do
    @flush += 1
  end
  begin
    loop do
      if @thread.nil? || !@thread.alive?
        return
      end

      io_in_progress, queues_empty = @lock.synchronize do
        [io_in_progress?, status_queue.empty? && snapshot_queue.empty?]
      end

      if io_in_progress
        # If we just call Thread.pass we could be in a busy loop -
        # add a sleep.
        sleep 0.25
        next
      elsif queues_empty
        break
      else
        wake.signal
        sleep 0.25
        next
      end
    end
  ensure
    @lock.synchronize do
      @flush -= 1
    end
  end
end

#startObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/datadog/di/probe_notifier_worker.rb', line 46

def start
  return if @thread
  @thread = Thread.new do
    loop do
      # TODO If stop is requested, we stop immediately without
      # flushing the submissions. Should we send pending submissions
      # and then quit?
      break if @stop_requested

      # If a flush was requested, send immediately and do not
      # wait for the cooldown period.
      if @lock.synchronize { @flush } == 0
        sleep_remaining = @lock.synchronize do
          if sleep_remaining && sleep_remaining > 0
            # Recalculate how much sleep time is remaining, then sleep that long.
            set_sleep_remaining
          else
            0
          end
        end

        if sleep_remaining > 0
          # Do not need to update @wake_scheduled here because
          # wake-up is already scheduled for the earliest possible time.
          wake.wait(sleep_remaining)
          next
        end
      end

      begin
        more = maybe_send
      rescue => exc
        raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions

        logger.warn("Error in probe notifier worker: #{exc.class}: #{exc} (at #{exc.backtrace.first})")
        telemetry&.report(exc, description: "Error in probe notifier worker")
      end
      @lock.synchronize do
        @wake_scheduled = more
      end
      wake.wait(more ? min_send_interval : nil)
    end
  end
end

#stop(timeout = 1) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Stops the background thread.

Attempts a graceful stop with the specified timeout, then falls back to killing the thread using Thread#kill.



95
96
97
98
99
100
101
102
103
104
# File 'lib/datadog/di/probe_notifier_worker.rb', line 95

def stop(timeout = 1)
  @stop_requested = true
  wake.signal
  if thread
    unless thread.join(timeout)
      thread.kill
    end
    @thread = nil
  end
end