Module: OpenTelemetry::Instrumentation::RubyKafka::Patches::Client
- Defined in:
- lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb
Overview
The Client module contains the instrumentation patch the Client#deliver_message and Client#each_message methods.
Instance Method Summary collapse
- #deliver_message(value, topic:, key: nil, headers: {}, partition: nil, partition_key: nil, retries: 1) ⇒ Object
- #each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1_048_576, &block) ⇒ Object
Instance Method Details
#deliver_message(value, topic:, key: nil, headers: {}, partition: nil, partition_key: nil, retries: 1) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb', line 15 def (value, topic:, key: nil, headers: {}, partition: nil, partition_key: nil, retries: 1) attributes = { 'messaging.system' => 'kafka', 'messaging.destination' => topic, 'messaging.destination_kind' => 'topic' } = Utils.(key) attributes['messaging.kafka.message_key'] = if attributes['messaging.kafka.partition'] = partition if partition tracer.in_span("#{topic} publish", attributes: attributes, kind: :producer) do OpenTelemetry.propagation.inject(headers) super end end |
#each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1_048_576, &block) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/opentelemetry/instrumentation/ruby_kafka/patches/client.rb', line 33 def (topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1_048_576, &block) super do || attributes = { 'messaging.system' => 'kafka', 'messaging.destination' => .topic, 'messaging.destination_kind' => 'topic', 'messaging.kafka.partition' => .partition } = Utils.(.key) attributes['messaging.kafka.message_key'] = if parent_context = OpenTelemetry.propagation.extract(.headers) OpenTelemetry::Context.with_current(parent_context) do tracer.in_span("#{topic} process", attributes: attributes, kind: :consumer) do yield end end end end |