Module: OpenTelemetry::Instrumentation::RubyKafka::Patches::Producer

Defined in:
lib/opentelemetry/instrumentation/ruby_kafka/patches/producer.rb

Overview

The Producer module contains the instrumentation patch the Producer#produce method

Instance Method Summary collapse

Instance Method Details

#produce(value, topic:, key: nil, headers: {}, partition: nil, partition_key: nil, create_time: Time.now) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/opentelemetry/instrumentation/ruby_kafka/patches/producer.rb', line 13

def produce(value, topic:, key: nil, headers: {}, partition: nil, partition_key: nil, create_time: Time.now)
  attributes = {
    'messaging.system' => 'kafka',
    'messaging.destination' => topic,
    'messaging.destination_kind' => 'topic'
  }

  # If trace context is present in headers, extract and use it as parent. If there is _no_ trace context key
  # in the headers, OpenTelemetry.propagation.extract will return an unmodified copy of the the current
  # Thread's context, so the next two lines preserve the correct Thread-local context.
  ctx = OpenTelemetry.propagation.extract(headers)
  OpenTelemetry::Context.with_current(ctx) do
    tracer.in_span("#{topic} publish", attributes: attributes, kind: :producer) do
      OpenTelemetry.propagation.inject(headers)
      super
    end
  end
end