Class: Datadog::Workers::AsyncTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/ddtrace/workers.rb

Overview

Asynchronous worker that executes a Send() operation after given seconds. Under the hood, it uses Concurrent::TimerTask so that the thread will perform a task at regular intervals. The thread can be stopped with the stop() method and can start with the start() method.

Constant Summary collapse

DEFAULT_BUFFER_MAX_SIZE =
1000
DEFAULT_FLUSH_INTERVAL =
1
DEFAULT_TIMEOUT =
5
BACK_OFF_RATIO =
1.2
BACK_OFF_MAX =
5
SHUTDOWN_TIMEOUT =
1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ AsyncTransport

Returns a new instance of AsyncTransport.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/ddtrace/workers.rb', line 23

def initialize(options = {})
  @transport = options[:transport]

  # Callbacks
  @trace_task = options[:on_trace]
  @runtime_metrics_task = options[:on_runtime_metrics]

  # Intervals
  interval = options.fetch(:interval, DEFAULT_FLUSH_INTERVAL)
  @flush_interval = interval
  @back_off = interval

  # Buffers
  buffer_size = options.fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE)
  @trace_buffer = TraceBuffer.new(buffer_size)

  # Threading
  @shutdown = ConditionVariable.new
  @mutex = Mutex.new
  @worker = nil
  @run = false
end

Instance Attribute Details

#trace_bufferObject (readonly)

Returns the value of attribute trace_buffer.



20
21
22
# File 'lib/ddtrace/workers.rb', line 20

def trace_buffer
  @trace_buffer
end

Instance Method Details

#callback_runtime_metricsObject



64
65
66
67
68
69
70
# File 'lib/ddtrace/workers.rb', line 64

def callback_runtime_metrics
  @runtime_metrics_task.call unless @runtime_metrics_task.nil?
rescue StandardError => e
  Datadog::Tracer.log.error(
    "Error during runtime metrics flush. Cause: #{e} Location: #{e.backtrace.first}"
  )
end

#callback_tracesObject Also known as: flush_data

Callback function that process traces and executes the send_traces() method.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/ddtrace/workers.rb', line 47

def callback_traces
  return true if @trace_buffer.empty?

  begin
    traces = @trace_buffer.pop
    traces = Pipeline.process!(traces)
    @trace_task.call(traces, @transport) unless @trace_task.nil?
  rescue StandardError => e
    # ensures that the thread will not die because of an exception.
    # TODO[manu]: findout the reason and reschedule the send if it's not
    # a fatal exception
    Datadog::Tracer.log.error(
      "Error during traces flush: dropped #{traces.length} items. Cause: #{e} Location: #{e.backtrace.first}"
    )
  end
end

#enqueue_trace(trace) ⇒ Object

Enqueue an item in the trace internal buffer. This operation is thread-safe because uses the TraceBuffer data structure.



103
104
105
# File 'lib/ddtrace/workers.rb', line 103

def enqueue_trace(trace)
  @trace_buffer.push(trace)
end

#joinObject

Block until executor shutdown is complete or until timeout seconds have passed.



97
98
99
# File 'lib/ddtrace/workers.rb', line 97

def join
  @worker.join(SHUTDOWN_TIMEOUT)
end

#startObject

Start the timer execution.



73
74
75
76
77
78
79
80
# File 'lib/ddtrace/workers.rb', line 73

def start
  @mutex.synchronize do
    return if @run
    @run = true
    Tracer.log.debug("Starting thread in the process: #{Process.pid}")
    @worker = Thread.new { perform }
  end
end

#stopObject

Closes all available queues and waits for the trace buffer to flush



83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/ddtrace/workers.rb', line 83

def stop
  @mutex.synchronize do
    return unless @run

    @trace_buffer.close
    @run = false
    @shutdown.signal
  end

  join
  true
end