Class: Datadog::Workers::AsyncTransport
- Inherits:
-
Object
- Object
- Datadog::Workers::AsyncTransport
- Defined in:
- lib/ddtrace/workers.rb
Overview
Asynchronous worker that executes a Send() operation after given seconds. Under the hood, it uses Concurrent::TimerTask
so that the thread will perform a task at regular intervals. The thread can be stopped with the stop() method and can start with the start() method.
Constant Summary collapse
- DEFAULT_BUFFER_MAX_SIZE =
1000
- DEFAULT_FLUSH_INTERVAL =
1
- DEFAULT_TIMEOUT =
5
- BACK_OFF_RATIO =
1.2
- BACK_OFF_MAX =
5
- SHUTDOWN_TIMEOUT =
1
Instance Attribute Summary collapse
-
#trace_buffer ⇒ Object
readonly
Returns the value of attribute trace_buffer.
Instance Method Summary collapse
- #callback_runtime_metrics ⇒ Object
-
#callback_traces ⇒ Object
(also: #flush_data)
Callback function that process traces and executes the send_traces() method.
-
#enqueue_trace(trace) ⇒ Object
Enqueue an item in the trace internal buffer.
-
#initialize(options = {}) ⇒ AsyncTransport
constructor
A new instance of AsyncTransport.
-
#join ⇒ Object
Block until executor shutdown is complete or until timeout seconds have passed.
-
#start ⇒ Object
Start the timer execution.
-
#stop ⇒ Object
Closes all available queues and waits for the trace buffer to flush.
Constructor Details
#initialize(options = {}) ⇒ AsyncTransport
Returns a new instance of AsyncTransport.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/ddtrace/workers.rb', line 23 def initialize( = {}) @transport = [:transport] # Callbacks @trace_task = [:on_trace] @runtime_metrics_task = [:on_runtime_metrics] # Intervals interval = .fetch(:interval, DEFAULT_FLUSH_INTERVAL) @flush_interval = interval @back_off = interval # Buffers buffer_size = .fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE) @trace_buffer = TraceBuffer.new(buffer_size) # Threading @shutdown = ConditionVariable.new @mutex = Mutex.new @worker = nil @run = false end |
Instance Attribute Details
#trace_buffer ⇒ Object (readonly)
Returns the value of attribute trace_buffer.
20 21 22 |
# File 'lib/ddtrace/workers.rb', line 20 def trace_buffer @trace_buffer end |
Instance Method Details
#callback_runtime_metrics ⇒ Object
64 65 66 67 68 69 70 |
# File 'lib/ddtrace/workers.rb', line 64 def callback_runtime_metrics @runtime_metrics_task.call unless @runtime_metrics_task.nil? rescue StandardError => e Datadog::Tracer.log.error( "Error during runtime metrics flush. Cause: #{e} Location: #{e.backtrace.first}" ) end |
#callback_traces ⇒ Object Also known as: flush_data
Callback function that process traces and executes the send_traces() method.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/ddtrace/workers.rb', line 47 def callback_traces return true if @trace_buffer.empty? begin traces = @trace_buffer.pop traces = Pipeline.process!(traces) @trace_task.call(traces, @transport) unless @trace_task.nil? rescue StandardError => e # ensures that the thread will not die because of an exception. # TODO[manu]: findout the reason and reschedule the send if it's not # a fatal exception Datadog::Tracer.log.error( "Error during traces flush: dropped #{traces.length} items. Cause: #{e} Location: #{e.backtrace.first}" ) end end |
#enqueue_trace(trace) ⇒ Object
Enqueue an item in the trace internal buffer. This operation is thread-safe because uses the TraceBuffer
data structure.
103 104 105 |
# File 'lib/ddtrace/workers.rb', line 103 def enqueue_trace(trace) @trace_buffer.push(trace) end |
#join ⇒ Object
Block until executor shutdown is complete or until timeout seconds have passed.
97 98 99 |
# File 'lib/ddtrace/workers.rb', line 97 def join @worker.join(SHUTDOWN_TIMEOUT) end |
#start ⇒ Object
Start the timer execution.
73 74 75 76 77 78 79 80 |
# File 'lib/ddtrace/workers.rb', line 73 def start @mutex.synchronize do return if @run @run = true Tracer.log.debug("Starting thread in the process: #{Process.pid}") @worker = Thread.new { perform } end end |
#stop ⇒ Object
Closes all available queues and waits for the trace buffer to flush
83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/ddtrace/workers.rb', line 83 def stop @mutex.synchronize do return unless @run @trace_buffer.close @run = false @shutdown.signal end join true end |