Module: OpenTelemetry::Instrumentation::Rdkafka::Patches::Consumer
- Defined in:
- lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb
Overview
The Consumer module contains the instrumentation patch for the Consumer class
Instance Method Summary collapse
- #each ⇒ Object
- #each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block) ⇒ Object
Instance Method Details
#each ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb', line 13 def each super do || attributes = { 'messaging.system' => 'kafka', 'messaging.destination' => .topic, 'messaging.destination_kind' => 'topic', 'messaging.kafka.partition' => .partition, 'messaging.kafka.offset' => .offset } = (.key) attributes['messaging.kafka.message_key'] = if parent_context = OpenTelemetry.propagation.extract(.headers, getter: OpenTelemetry::Common::Propagation.symbol_key_getter) span_context = OpenTelemetry::Trace.current_span(parent_context).context links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? OpenTelemetry::Context.with_current(parent_context) do tracer.in_span("#{.topic} process", links: links, attributes: attributes, kind: :consumer) do yield end end end end |
#each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb', line 38 def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block) super do |, error| if .empty? yield , error else attributes = { 'messaging.system' => 'kafka', 'messaging.destination_kind' => 'topic', 'messaging.kafka.message_count' => .size } links = .map do || trace_context = OpenTelemetry.propagation.extract(.headers, getter: OpenTelemetry::Common::Propagation.symbol_key_getter) span_context = OpenTelemetry::Trace.current_span(trace_context).context OpenTelemetry::Trace::Link.new(span_context) if span_context.valid? end links.compact! tracer.in_span('batch process', attributes: attributes, links: links, kind: :consumer) do yield , error end end end end |