Module: Datadog::Core::Workers::Queue Private

Included in:
Telemetry::Worker, OpenFeature::Exposures::Worker, Tracing::Workers::AsyncTraceWriter
Defined in:
lib/datadog/core/workers/queue.rb

Overview

This module is part of a private API. You should avoid using this module if possible, as it may be removed or be changed in the future.

Adds queue behavior to workers, with a buffer to which items can be queued then dequeued.

This module is included in some but not all workers. Notably, Data Streams Processor uses a queue but implements it inline rather than using this module.

The workers that do include Queue also include Polling, which in turn includes Async::Thread and IntervalLoop. This means we have e.g. in_iteration? always available in any worker that includes Queue.

Defined Under Namespace

Modules: PrependedMethods

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#bufferObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



34
35
36
37
38
# File 'lib/datadog/core/workers/queue.rb', line 34

def buffer
  # Why is this an unsynchronized Array and not a Core::Buffer
  # instance?
  @buffer ||= []
end

Class Method Details

.included(base) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



20
21
22
# File 'lib/datadog/core/workers/queue.rb', line 20

def self.included(base)
  base.prepend(PrependedMethods)
end

Instance Method Details

#dequeueObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



44
45
46
# File 'lib/datadog/core/workers/queue.rb', line 44

def dequeue
  buffer.shift
end

#enqueue(*args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



40
41
42
# File 'lib/datadog/core/workers/queue.rb', line 40

def enqueue(*args)
  buffer.push(args)
end

#flush(timeout: nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Wait for the worker to finish handling all work that has already been submitted to it.

If the worker is not enabled, returns nil. If the worker is enabled, returns whether, at the point of return, there was no pending or in progress work.

Flushing can time out because there is a constant stream of work submitted at the same or higher rate than it is processed. Flushing can also fail if the worker thread is not running - this method will not flush from the calling thread.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/datadog/core/workers/queue.rb', line 64

def flush(timeout: nil)
  # Default timeout is 5 seconds.
  # Specific workers can override it to be more or less
  timeout ||= 5

  # Nothing needs to be done if the worker is not enabled.
  return nil unless enabled?

  unless running?
    unless buffer.empty?
      # If we are asked to flush but the worker is not running,
      # do not flush from the caller thread. If the buffer is not
      # empty, it will not be flushed. Log a warning to this effect.
      #
      # We are not guaranteed to have a logger as an instance method,
      # reference the global for now - all other worker methods
      # also reference the logger globally.
      # TODO inject it into worker instances.
      Datadog.logger.debug { "Asked to flush #{self} when the worker is not running" }
      return false
    end
  end

  started = Utils::Time.get_time
  loop do
    # The AppStarted event is triggered by the worker itself,
    # from the worker thread. As such the main thread has no way
    # to delay itself until that event is queued and we need some
    # way to wait until that event is sent out to assert on it in
    # the test suite. Check the run once flag which *should*
    # indicate the event has been queued (at which point our queue
    # depth check should wait until it's sent).
    # This is still a hack because the flag can be overridden
    # either way with or without the event being sent out.
    # Note that if the AppStarted sending fails, this check
    # will return false and flushing will be blocked until the
    # 15 second timeout.
    # Note that the first wait interval between telemetry event
    # sending is 10 seconds, the timeout needs to be strictly
    # greater than that.
    return true if idle?

    return false if Utils::Time.get_time - started > timeout

    sleep 0.5
  end
end

#work_pending?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Are there more items to be processed next?

Returns:

  • (Boolean)


49
50
51
# File 'lib/datadog/core/workers/queue.rb', line 49

def work_pending?
  !buffer.empty?
end