Class: OpenTelemetry::Instrumentation::ProcessMessageSubscriber
- Inherits:
-
Object
- Object
- OpenTelemetry::Instrumentation::ProcessMessageSubscriber
- Defined in:
- lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb
Overview
This class contains the ASN subsciber that instruments message processing
Instance Method Summary collapse
- #attributes(payload) ⇒ Object
- #finish(name, id, payload) ⇒ Object
- #start(_name, _id, payload) ⇒ Object
- #tracer ⇒ Object
Instance Method Details
#attributes(payload) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb', line 29 def attributes(payload) attributes = { 'messaging.system' => 'kafka', 'messaging.destination' => payload[:topic], 'messaging.destination_kind' => 'topic', 'messaging.kafka.partition' => payload[:partition], 'messaging.kafka.offset' => payload[:offset] } attributes['messaging.kafka.message_key'] = payload[:key] if payload[:key] attributes end |
#finish(name, id, payload) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb', line 42 def finish(name, id, payload) span = payload.delete(:__opentelemetry_span) token = payload.delete(:__opentelemetry_ctx_token) parent_token = payload.delete(:__opentelemetry_parent_ctx_token) return unless span && token if (e = payload[:exception_object]) span.record_exception(e) span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}") end span.finish OpenTelemetry::Context.detach(token) OpenTelemetry::Context.detach(parent_token) end |
#start(_name, _id, payload) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb', line 11 def start(_name, _id, payload) attrs = attributes(payload) parent_context = OpenTelemetry.propagation.extract(payload[:headers], getter: OpenTelemetry::Common::Propagation.symbol_key_getter) parent_token = OpenTelemetry::Context.attach(parent_context) span_context = OpenTelemetry::Trace.current_span(parent_context).context links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? span = tracer.start_span("#{payload[:topic]} process", kind: :consumer, attributes: attrs, links: links) token = OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span)) payload.merge!( __opentelemetry_span: span, __opentelemetry_ctx_token: token, __opentelemetry_parent_ctx_token: parent_token ) end |
#tracer ⇒ Object
7 8 9 |
# File 'lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb', line 7 def tracer Racecar::Instrumentation.instance.tracer end |