Class: Datadog::Tracing::Workers::AsyncTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/datadog/tracing/workers.rb

Overview

Asynchronous worker that executes a Send() operation after given seconds. Under the hood, it uses Concurrent::TimerTask so that the thread will perform a task at regular intervals. The thread can be stopped with the stop() method and can start with the start() method.

Constant Summary collapse

DEFAULT_BUFFER_MAX_SIZE =
1000
DEFAULT_FLUSH_INTERVAL =
1
DEFAULT_TIMEOUT =
5
BACK_OFF_RATIO =
1.2
BACK_OFF_MAX =
5
DEFAULT_SHUTDOWN_TIMEOUT =
1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ AsyncTransport

Returns a new instance of AsyncTransport.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/datadog/tracing/workers.rb', line 22

def initialize(options = {})
  @transport = options[:transport]

  # Callbacks
  @trace_task = options[:on_trace]

  # Intervals
  interval = options.fetch(:interval, DEFAULT_FLUSH_INTERVAL)
  @flush_interval = interval
  @back_off = interval

  # Buffers
  buffer_size = options.fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE)
  @trace_buffer = TraceBuffer.new(buffer_size)

  # Threading
  @shutdown = ConditionVariable.new
  @shutdown_timeout = options.fetch(:shutdown_timeout, DEFAULT_SHUTDOWN_TIMEOUT)
  @mutex = Mutex.new
  @worker = nil
  @run = false
end

Instance Attribute Details

#trace_bufferObject (readonly)

Returns the value of attribute trace_buffer.



19
20
21
# File 'lib/datadog/tracing/workers.rb', line 19

def trace_buffer
  @trace_buffer
end

Instance Method Details

#callback_tracesObject Also known as: flush_data

Callback function that process traces and executes the send_traces() method.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/datadog/tracing/workers.rb', line 46

def callback_traces
  return true if @trace_buffer.empty?

  begin
    traces = @trace_buffer.pop
    traces = Pipeline.process!(traces)
    @trace_task.call(traces, @transport) unless @trace_task.nil? || traces.empty?
  rescue StandardError => e
    # ensures that the thread will not die because of an exception.
    # TODO[manu]: findout the reason and reschedule the send if it's not
    # a fatal exception
    Datadog.logger.error(
      "Error during traces flush: dropped #{traces.length} items. Cause: #{e} Location: #{Array(e.backtrace).first}"
    )
  end
end

#enqueue_trace(trace) ⇒ Object

Enqueue an item in the trace internal buffer. This operation is thread-safe because uses the TraceBuffer data structure.



99
100
101
102
103
# File 'lib/datadog/tracing/workers.rb', line 99

def enqueue_trace(trace)
  return unless trace && !trace.empty?

  @trace_buffer.push(trace)
end

#joinObject

Block until executor shutdown is complete or until timeout seconds have passed.



93
94
95
# File 'lib/datadog/tracing/workers.rb', line 93

def join
  @worker.join(@shutdown_timeout)
end

#startObject

Start the timer execution.



64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/datadog/tracing/workers.rb', line 64

def start
  @mutex.synchronize do
    return if @run

    @run = true
    Datadog.logger.debug { "Starting thread for: #{self}" }
    @worker = Thread.new { perform }
    @worker.name = self.class.name unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3')
    @worker.thread_variable_set(:fork_safe, true)

    nil
  end
end

#stopObject

Closes all available queues and waits for the trace buffer to flush



79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/datadog/tracing/workers.rb', line 79

def stop
  @mutex.synchronize do
    return unless @run

    @trace_buffer.close
    @run = false
    @shutdown.signal
  end

  join
  true
end