Class: OpenTelemetry::Instrumentation::ProcessMessageSubscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb

Overview

This class contains the ASN subsciber that instruments message processing

Instance Method Summary collapse

Instance Method Details

#attributes(payload) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb', line 29

def attributes(payload)
  attributes = {
    'messaging.system' => 'kafka',
    'messaging.destination' => payload[:topic],
    'messaging.destination_kind' => 'topic',
    'messaging.kafka.partition' => payload[:partition],
    'messaging.kafka.offset' => payload[:offset]
  }

  attributes['messaging.kafka.message_key'] = payload[:key] if payload[:key]
  attributes
end

#finish(name, id, payload) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb', line 42

def finish(name, id, payload)
  span = payload.delete(:__opentelemetry_span)
  token = payload.delete(:__opentelemetry_ctx_token)
  parent_token = payload.delete(:__opentelemetry_parent_ctx_token)
  return unless span && token

  if (e = payload[:exception_object])
    span.record_exception(e)
    span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}")
  end

  span.finish
  OpenTelemetry::Context.detach(token)
  OpenTelemetry::Context.detach(parent_token)
end

#start(_name, _id, payload) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb', line 11

def start(_name, _id, payload)
  attrs = attributes(payload)

  parent_context = OpenTelemetry.propagation.extract(payload[:headers], getter: OpenTelemetry::Common::Propagation.symbol_key_getter)
  parent_token = OpenTelemetry::Context.attach(parent_context)

  span_context = OpenTelemetry::Trace.current_span(parent_context).context
  links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid?

  span = tracer.start_span("#{payload[:topic]} process", kind: :consumer, attributes: attrs, links: links)
  token = OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))
  payload.merge!(
    __opentelemetry_span: span,
    __opentelemetry_ctx_token: token,
    __opentelemetry_parent_ctx_token: parent_token
  )
end

#tracerObject



7
8
9
# File 'lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb', line 7

def tracer
  Racecar::Instrumentation.instance.tracer
end