Class: OpenTelemetry::Exporters::Datadog::DatadogSpanProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/opentelemetry/exporters/datadog/datadog_span_processor.rb

Overview

Implementation of the duck type SpanProcessor that batches spans exported by the SDK into complete traces then pushes them to the exporter pipeline.

All spans reported by the SDK implementation are first added to a synchronized in memory trace storage (with a #max_queue_size maximum size, of trace size #max_trace_size after the size of either is reached spans are dropped). When traces are designated as "complete" they're added to a queue that is exported every schedule_delay_millis to the exporter pipeline in batches of completed traces. The datadog writer and transport supplied to the exporter handle the bulk of the timeout and retry logic.

Instance Method Summary collapse

Constructor Details

#initialize(exporter:, schedule_delay_millis: SCHEDULE_DELAY_MILLIS, max_queue_size: MAX_QUEUE_SIZE, max_trace_size: MAX_TRACE_SIZE) ⇒ DatadogSpanProcessor

Returns a new instance of DatadogSpanProcessor.

Raises:

  • (ArgumentError)


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/opentelemetry/exporters/datadog/datadog_span_processor.rb', line 31

def initialize(exporter:,
               schedule_delay_millis: SCHEDULE_DELAY_MILLIS,
               max_queue_size: MAX_QUEUE_SIZE,
               max_trace_size: MAX_TRACE_SIZE)
  raise ArgumentError if max_trace_size > max_queue_size

  @exporter = exporter
  @mutex = Mutex.new
  @condition = ConditionVariable.new
  @keep_running = true
  @delay_seconds = schedule_delay_millis / 1000.0
  @max_queue_size = max_queue_size
  @max_trace_size = max_trace_size
  @spans = []
  @thread = Thread.new { work }

  @traces = {}
  @traces_spans_count = {}
  @traces_spans_ended_count = {}
  @check_traces_queue = []
  @_spans_dropped = false
end

Instance Method Details

#force_flushObject

TODO: test this explicitly. Export all ended traces to the configured Exporter that have not yet been exported.

This method should only be called in cases where it is absolutely necessary, such as when using some FaaS providers that may suspend the process after an invocation, but before the Processor exports the completed spans.



131
132
133
134
# File 'lib/opentelemetry/exporters/datadog/datadog_span_processor.rb', line 131

def force_flush
  snapshot = lock { fetch_batch }
  export_batch(snapshot)
end

#on_finish(span) ⇒ Object

adds a span to the batcher, threadsafe may block on lock



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/opentelemetry/exporters/datadog/datadog_span_processor.rb', line 96

def on_finish(span)
  if @keep_running == false
    OpenTelemetry.logger.warn('Already shutdown, dropping span')
    return
  end

  # TODO: determine if all "not-sampled" spans still get passed to on_finish?
  # If so then we don't need to account for Probability Sampling
  # and can likely incorporate Priority Sampling from DD
  # If not, then we need to ensure the rate from OpenTelemetry.tracer_provider.active_trace_config.sampler
  # can be expoed to the span or attached to spanData in some way
  # return unless span.context.trace_flags.sampled?

  context = span.context
  trace_id = context.trace_id

  lock do
    if traces_spans_ended_count[trace_id].nil?
      traces_spans_ended_count[trace_id] = 1
    else
      traces_spans_ended_count[trace_id] += 1
    end

    check_traces_queue.unshift(trace_id) if trace_exportable?(trace_id)
  end
end

#on_start(span, _parent_context) ⇒ Object

datadog trace-agent endpoint requires a complete trace to be sent threadsafe may block on lock



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/opentelemetry/exporters/datadog/datadog_span_processor.rb', line 56

def on_start(span, _parent_context)
  context = span.context
  trace_id = context.trace_id

  lock do
    if all_spans_count(traces_spans_count) >= max_queue_size
      # instead of just dropping all new spans, dd-trace-rb drops a random trace
      # https://github.com/DataDog/dd-trace-rb/blob/c6fbf2410a60495f1b2d8912bf7ea7dc63422141/lib/ddtrace/buffer.rb#L34-L36
      # It allows for a more fair usage of the queue when under stress load,
      # and will create proportional representation of code paths being instrumented at stress time.
      unfinished_trace_id = fetch_unfinished_trace_id

      # if there are no unfinished traces able to be dropped, don't add more spans, and return early
      if unfinished_trace_id.nil?
        OpenTelemetry.logger.warn('Max spans for all traces, spans will be dropped')
        @_spans_dropped = true
        return
      end

      drop_unfinished_trace(unfinished_trace_id)
      OpenTelemetry.logger.warn('Max spans for all traces, traces will be dropped')
    end

    if traces[trace_id].nil?
      traces[trace_id] = [span]
      traces_spans_count[trace_id] = 1
    else
      if traces[trace_id].size >= max_trace_size
        OpenTelemetry.logger.warn('Max spans for trace, spans will be dropped')
        @_spans_dropped = true
        return
      end

      traces[trace_id] << span
      traces_spans_count[trace_id] += 1
    end
  end
end

#shutdown(timeout: nil) ⇒ Object

shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished



138
139
140
141
142
143
144
145
146
147
# File 'lib/opentelemetry/exporters/datadog/datadog_span_processor.rb', line 138

def shutdown(timeout: nil)
  lock do
    @keep_running = false
    @condition.signal
  end

  @thread.join
  force_flush
  @exporter.shutdown
end