Class: Datadog::DI::ProbeNotifierWorker Private

Inherits:
Object
  • Object
show all
Defined in:
lib/datadog/di/probe_notifier_worker.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Background worker thread for sending probe statuses and snapshots to the backend (via the agent).

The loop inside the worker rescues all exceptions to prevent termination due to unhandled exceptions raised by any downstream code. This includes communication and protocol errors when sending the events to the agent.

The worker groups the data to send into batches. The goal is to perform no more than one network operation per event type per second. There is also a limit on the length of the sending queue to prevent it from growing without bounds if upstream code generates an enormous number of events for some reason.

Wake-up events are used (via ConditionVariable) to keep the thread asleep if there is no work to be done.

API:

  • private

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(settings, logger, agent_settings:, telemetry: nil) ⇒ ProbeNotifierWorker

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of ProbeNotifierWorker.

API:

  • private



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/datadog/di/probe_notifier_worker.rb', line 26

def initialize(settings, logger, agent_settings:, telemetry: nil)
  @settings = settings
  @telemetry = telemetry
  @status_queue = []
  @snapshot_queue = []
  @agent_settings = agent_settings
  @logger = logger
  @lock = Mutex.new
  @wake = Core::Semaphore.new
  @io_in_progress = false
  @sleep_remaining = nil
  @wake_scheduled = false
  @thread = nil
  @pid = nil
  @flush = 0
end

Instance Attribute Details

#agent_settingsObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

API:

  • private



46
47
48
# File 'lib/datadog/di/probe_notifier_worker.rb', line 46

def agent_settings
  @agent_settings
end

#loggerObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

API:

  • private



44
45
46
# File 'lib/datadog/di/probe_notifier_worker.rb', line 44

def logger
  @logger
end

#settingsObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

API:

  • private



43
44
45
# File 'lib/datadog/di/probe_notifier_worker.rb', line 43

def settings
  @settings
end

#telemetryObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

API:

  • private



45
46
47
# File 'lib/datadog/di/probe_notifier_worker.rb', line 45

def telemetry
  @telemetry
end

Instance Method Details

#flushObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Waits for background thread to send pending notifications.

This method waits for the notification queue to become empty rather than for a particular set of notifications to be sent out, therefore, it should only be called when there is no parallel activity (in another thread) that causes more notifications to be generated.

This method is used by the test suite to wait until notifications have been sent out, and could be used for graceful stopping of the worker thread.

API:

  • private



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/datadog/di/probe_notifier_worker.rb', line 122

def flush
  @lock.synchronize do
    @flush += 1
  end
  begin
    loop do
      if @thread.nil? || !@thread.alive?
        return
      end

      io_in_progress, queues_empty = @lock.synchronize do
        [io_in_progress?, status_queue.empty? && snapshot_queue.empty?]
      end

      if io_in_progress
        # If we just call Thread.pass we could be in a busy loop -
        # add a sleep.
        sleep 0.25
        next
      elsif queues_empty
        break
      else
        wake.signal
        sleep 0.25
        next
      end
    end
  ensure
    @lock.synchronize do
      @flush -= 1
    end
  end
end

#startObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

API:

  • private



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/datadog/di/probe_notifier_worker.rb', line 48

def start
  return if @thread && @pid == Process.pid
  logger.trace { "di: starting probe notifier: pid #{$$}" }
  @thread = Thread.new do
    loop do
      # TODO If stop is requested, we stop immediately without
      # flushing the submissions. Should we send pending submissions
      # and then quit?
      break if @stop_requested

      # If a flush was requested, send immediately and do not
      # wait for the cooldown period.
      if @lock.synchronize { @flush } == 0
        sleep_remaining = @lock.synchronize do
          if sleep_remaining && sleep_remaining > 0
            # Recalculate how much sleep time is remaining, then sleep that long.
            set_sleep_remaining
          else
            0
          end
        end

        if sleep_remaining > 0
          # Do not need to update @wake_scheduled here because
          # wake-up is already scheduled for the earliest possible time.
          wake.wait(sleep_remaining)
          next
        end
      end

      begin
        more = maybe_send
      rescue => exc
        raise if settings.dynamic_instrumentation.internal.propagate_all_exceptions

        logger.debug { "di: error in probe notifier worker: #{exc.class}: #{exc} (at #{exc.backtrace.first})" }
        telemetry&.report(exc, description: "Error in probe notifier worker")
      end
      @lock.synchronize do
        @wake_scheduled = more
      end
      wake.wait(more ? min_send_interval : nil)
    end
  end
  @pid = Process.pid
end

#stop(timeout = 1) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Stops the background thread.

Attempts a graceful stop with the specified timeout, then falls back to killing the thread using Thread#kill.

API:

  • private



99
100
101
102
103
104
105
106
107
108
109
# File 'lib/datadog/di/probe_notifier_worker.rb', line 99

def stop(timeout = 1)
  @stop_requested = true
  logger.trace { "di: stopping probe notifier: pid #{$$}" }
  wake.signal
  if thread
    unless thread.join(timeout)
      thread.kill
    end
    @thread = nil
  end
end