Class: OpenTelemetry::Exporters::Datadog::DatadogSpanProcessor
- Inherits:
-
Object
- Object
- OpenTelemetry::Exporters::Datadog::DatadogSpanProcessor
- Defined in:
- lib/opentelemetry/exporters/datadog/datadog_span_processor.rb
Overview
Implementation of the duck type SpanProcessor that batches spans exported by the SDK into complete traces then pushes them to the exporter pipeline.
All spans reported by the SDK implementation are first added to a synchronized in memory trace storage (with a #max_queue_size maximum size, of trace size #max_trace_size after the size of either is reached spans are dropped). When traces are designated as "complete" they're added to a queue that is exported every schedule_delay_millis to the exporter pipeline in batches of completed traces. The datadog writer and transport supplied to the exporter handle the bulk of the timeout and retry logic.
Instance Method Summary collapse
-
#force_flush ⇒ Object
TODO: test this explicitly.
-
#initialize(exporter:, schedule_delay_millis: SCHEDULE_DELAY_MILLIS, max_queue_size: MAX_QUEUE_SIZE, max_trace_size: MAX_TRACE_SIZE) ⇒ DatadogSpanProcessor
constructor
A new instance of DatadogSpanProcessor.
-
#on_finish(span) ⇒ Object
adds a span to the batcher, threadsafe may block on lock.
-
#on_start(span, _parent_context) ⇒ Object
datadog trace-agent endpoint requires a complete trace to be sent threadsafe may block on lock.
-
#shutdown(timeout: nil) ⇒ Object
shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished.
Constructor Details
#initialize(exporter:, schedule_delay_millis: SCHEDULE_DELAY_MILLIS, max_queue_size: MAX_QUEUE_SIZE, max_trace_size: MAX_TRACE_SIZE) ⇒ DatadogSpanProcessor
Returns a new instance of DatadogSpanProcessor.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/opentelemetry/exporters/datadog/datadog_span_processor.rb', line 31 def initialize(exporter:, schedule_delay_millis: SCHEDULE_DELAY_MILLIS, max_queue_size: MAX_QUEUE_SIZE, max_trace_size: MAX_TRACE_SIZE) raise ArgumentError if max_trace_size > max_queue_size @exporter = exporter @mutex = Mutex.new @condition = ConditionVariable.new @keep_running = true @delay_seconds = schedule_delay_millis / 1000.0 @max_queue_size = max_queue_size @max_trace_size = max_trace_size @spans = [] @thread = Thread.new { work } @traces = {} @traces_spans_count = {} @traces_spans_ended_count = {} @check_traces_queue = [] @_spans_dropped = false end |
Instance Method Details
#force_flush ⇒ Object
TODO: test this explicitly.
Export all ended traces to the configured Exporter
that have not yet
been exported.
This method should only be called in cases where it is absolutely
necessary, such as when using some FaaS providers that may suspend
the process after an invocation, but before the Processor
exports
the completed spans.
131 132 133 134 |
# File 'lib/opentelemetry/exporters/datadog/datadog_span_processor.rb', line 131 def force_flush snapshot = lock { fetch_batch } export_batch(snapshot) end |
#on_finish(span) ⇒ Object
adds a span to the batcher, threadsafe may block on lock
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/opentelemetry/exporters/datadog/datadog_span_processor.rb', line 96 def on_finish(span) if @keep_running == false OpenTelemetry.logger.warn('Already shutdown, dropping span') return end # TODO: determine if all "not-sampled" spans still get passed to on_finish? # If so then we don't need to account for Probability Sampling # and can likely incorporate Priority Sampling from DD # If not, then we need to ensure the rate from OpenTelemetry.tracer_provider.active_trace_config.sampler # can be expoed to the span or attached to spanData in some way # return unless span.context.trace_flags.sampled? context = span.context trace_id = context.trace_id lock do if traces_spans_ended_count[trace_id].nil? traces_spans_ended_count[trace_id] = 1 else traces_spans_ended_count[trace_id] += 1 end check_traces_queue.unshift(trace_id) if trace_exportable?(trace_id) end end |
#on_start(span, _parent_context) ⇒ Object
datadog trace-agent endpoint requires a complete trace to be sent threadsafe may block on lock
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/opentelemetry/exporters/datadog/datadog_span_processor.rb', line 56 def on_start(span, _parent_context) context = span.context trace_id = context.trace_id lock do if all_spans_count(traces_spans_count) >= max_queue_size # instead of just dropping all new spans, dd-trace-rb drops a random trace # https://github.com/DataDog/dd-trace-rb/blob/c6fbf2410a60495f1b2d8912bf7ea7dc63422141/lib/ddtrace/buffer.rb#L34-L36 # It allows for a more fair usage of the queue when under stress load, # and will create proportional representation of code paths being instrumented at stress time. unfinished_trace_id = fetch_unfinished_trace_id # if there are no unfinished traces able to be dropped, don't add more spans, and return early if unfinished_trace_id.nil? OpenTelemetry.logger.warn('Max spans for all traces, spans will be dropped') @_spans_dropped = true return end drop_unfinished_trace(unfinished_trace_id) OpenTelemetry.logger.warn('Max spans for all traces, traces will be dropped') end if traces[trace_id].nil? traces[trace_id] = [span] traces_spans_count[trace_id] = 1 else if traces[trace_id].size >= max_trace_size OpenTelemetry.logger.warn('Max spans for trace, spans will be dropped') @_spans_dropped = true return end traces[trace_id] << span traces_spans_count[trace_id] += 1 end end end |
#shutdown(timeout: nil) ⇒ Object
shuts the consumer thread down and flushes the current accumulated buffer will block until the thread is finished
138 139 140 141 142 143 144 145 146 147 |
# File 'lib/opentelemetry/exporters/datadog/datadog_span_processor.rb', line 138 def shutdown(timeout: nil) lock do @keep_running = false @condition.signal end @thread.join force_flush @exporter.shutdown end |