Class: OpenTelemetry::Instrumentation::ProcessMessageSubscriber
- Inherits:
-
Object
- Object
- OpenTelemetry::Instrumentation::ProcessMessageSubscriber
- Defined in:
- lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb
Overview
This class contains the ASN subsciber that instruments message processing
Instance Method Summary collapse
- #attributes(payload) ⇒ Object
- #extract_message_key(key) ⇒ Object
- #finish(name, id, payload) ⇒ Object
- #start(_name, _id, payload) ⇒ Object
- #tracer ⇒ Object
Instance Method Details
#attributes(payload) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb', line 36 def attributes(payload) attributes = { 'messaging.system' => 'kafka', 'messaging.destination' => payload[:topic], 'messaging.destination_kind' => 'topic', 'messaging.kafka.partition' => payload[:partition], 'messaging.kafka.offset' => payload[:offset] } = (payload[:key]) attributes['messaging.kafka.message_key'] = if attributes end |
#extract_message_key(key) ⇒ Object
67 68 69 70 71 72 73 74 |
# File 'lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb', line 67 def (key) # skip encode if already valid utf8 return key if key.nil? || (key.encoding == Encoding::UTF_8 && key.valid_encoding?) key.encode(Encoding::UTF_8) rescue Encoding::UndefinedConversionError nil end |
#finish(name, id, payload) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb', line 51 def finish(name, id, payload) span = payload.delete(:__opentelemetry_span) token = payload.delete(:__opentelemetry_ctx_token) parent_token = payload.delete(:__opentelemetry_parent_ctx_token) return unless span && token if (e = payload[:exception_object]) span.record_exception(e) span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}") end span.finish OpenTelemetry::Context.detach(token) OpenTelemetry::Context.detach(parent_token) end |
#start(_name, _id, payload) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb', line 18 def start(_name, _id, payload) attrs = attributes(payload) parent_context = OpenTelemetry.propagation.extract(payload[:headers], getter: GETTER) parent_token = OpenTelemetry::Context.attach(parent_context) span_context = OpenTelemetry::Trace.current_span(parent_context).context links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? span = tracer.start_span("#{payload[:topic]} process", kind: :consumer, attributes: attrs, links: links) token = OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span)) payload.merge!( __opentelemetry_span: span, __opentelemetry_ctx_token: token, __opentelemetry_parent_ctx_token: parent_token ) end |
#tracer ⇒ Object
14 15 16 |
# File 'lib/opentelemetry/instrumentation/racecar/process_message_subscriber.rb', line 14 def tracer Racecar::Instrumentation.instance.tracer end |