Module: Datadog::Tracing::Contrib::Karafka::MessagesPatch
- Defined in:
- lib/datadog/tracing/contrib/karafka/patcher.rb
Overview
Patch to add tracing to Karafka::Messages::Messages
Instance Method Summary collapse
- #configuration ⇒ Object
-
#each(&block) ⇒ Object
eachis the most popular access point to Karafka messages, but not the only one Other access patterns do not have a straightforward tracing avenue (e.g. ‘my_batch_operation messages.payloads`). - #propagation ⇒ Object
Instance Method Details
#configuration ⇒ Object
13 14 15 |
# File 'lib/datadog/tracing/contrib/karafka/patcher.rb', line 13 def configuration Datadog.configuration.tracing[:karafka] end |
#each(&block) ⇒ Object
each is the most popular access point to Karafka messages, but not the only one
Other access patterns do not have a straightforward tracing avenue
(e.g. ‘my_batch_operation messages.payloads`)
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/datadog/tracing/contrib/karafka/patcher.rb', line 26 def each(&block) .each do || trace_digest = if configuration[:distributed_tracing] headers = if ..respond_to?(:raw_headers) ..raw_headers else ..headers end Karafka.extract(headers) end Tracing.trace(Ext::SPAN_MESSAGE_CONSUME, continue_from: trace_digest) do |span, trace| if Datadog::DataStreams.enabled? begin headers = if ..respond_to?(:raw_headers) ..raw_headers else ..headers end Datadog::DataStreams.set_consume_checkpoint( type: 'kafka', source: .topic, auto_instrumentation: true ) { |key| headers[key] } rescue => e Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}") end end span.set_tag(Ext::TAG_OFFSET, ..offset) span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, .topic) span.set_tag(Contrib::Ext::Messaging::TAG_SYSTEM, Ext::TAG_SYSTEM) span.resource = .topic yield end end end |
#propagation ⇒ Object
17 18 19 |
# File 'lib/datadog/tracing/contrib/karafka/patcher.rb', line 17 def propagation @propagation ||= Contrib::Karafka::Distributed::Propagation.new end |