Module: Datadog::Core::Workers::Queue Private
- Defined in:
- lib/datadog/core/workers/queue.rb
Overview
This module is part of a private API. You should avoid using this module if possible, as it may be removed or be changed in the future.
Adds queue behavior to workers, with a buffer to which items can be queued then dequeued.
This module is included in some but not all workers. Notably, Data Streams Processor uses a queue but implements it inline rather than using this module.
The workers that do include Queue also include Polling, which in turn includes Async::Thread and IntervalLoop. This means we have e.g. in_iteration? always available in any worker that includes Queue.
Defined Under Namespace
Modules: PrependedMethods
Instance Attribute Summary collapse
- #buffer ⇒ Object readonly private
Class Method Summary collapse
- .included(base) ⇒ Object private
Instance Method Summary collapse
- #dequeue ⇒ Object private
- #enqueue(*args) ⇒ Object private
-
#flush(timeout: nil) ⇒ Object
private
Wait for the worker to finish handling all work that has already been submitted to it.
-
#work_pending? ⇒ Boolean
private
Are there more items to be processed next?.
Instance Attribute Details
#buffer ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
34 35 36 37 38 |
# File 'lib/datadog/core/workers/queue.rb', line 34 def buffer # Why is this an unsynchronized Array and not a Core::Buffer # instance? @buffer ||= [] end |
Class Method Details
.included(base) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
20 21 22 |
# File 'lib/datadog/core/workers/queue.rb', line 20 def self.included(base) base.prepend(PrependedMethods) end |
Instance Method Details
#dequeue ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
44 45 46 |
# File 'lib/datadog/core/workers/queue.rb', line 44 def dequeue buffer.shift end |
#enqueue(*args) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
40 41 42 |
# File 'lib/datadog/core/workers/queue.rb', line 40 def enqueue(*args) buffer.push(args) end |
#flush(timeout: nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Wait for the worker to finish handling all work that has already been submitted to it.
If the worker is not enabled, returns nil. If the worker is enabled, returns whether, at the point of return, there was no pending or in progress work.
Flushing can time out because there is a constant stream of work submitted at the same or higher rate than it is processed. Flushing can also fail if the worker thread is not running - this method will not flush from the calling thread.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/datadog/core/workers/queue.rb', line 64 def flush(timeout: nil) # Default timeout is 5 seconds. # Specific workers can override it to be more or less timeout ||= 5 # Nothing needs to be done if the worker is not enabled. return nil unless enabled? unless running? unless buffer.empty? # If we are asked to flush but the worker is not running, # do not flush from the caller thread. If the buffer is not # empty, it will not be flushed. Log a warning to this effect. # # We are not guaranteed to have a logger as an instance method, # reference the global for now - all other worker methods # also reference the logger globally. # TODO inject it into worker instances. Datadog.logger.debug { "Asked to flush #{self} when the worker is not running" } return false end end started = Utils::Time.get_time loop do # The AppStarted event is triggered by the worker itself, # from the worker thread. As such the main thread has no way # to delay itself until that event is queued and we need some # way to wait until that event is sent out to assert on it in # the test suite. Check the run once flag which *should* # indicate the event has been queued (at which point our queue # depth check should wait until it's sent). # This is still a hack because the flag can be overridden # either way with or without the event being sent out. # Note that if the AppStarted sending fails, this check # will return false and flushing will be blocked until the # 15 second timeout. # Note that the first wait interval between telemetry event # sending is 10 seconds, the timeout needs to be strictly # greater than that. return true if idle? return false if Utils::Time.get_time - started > timeout sleep 0.5 end end |
#work_pending? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Are there more items to be processed next?
49 50 51 |
# File 'lib/datadog/core/workers/queue.rb', line 49 def work_pending? !buffer.empty? end |