Module: NewRelic::Agent::Instrumentation::Rdkafka
- Included in:
- NewRelic::Agent::Instrumentation::RdkafkaConsumer::Prepend, NewRelic::Agent::Instrumentation::RdkafkaProducer::Prepend
- Defined in:
- lib/new_relic/agent/instrumentation/rdkafka/instrumentation.rb
Defined Under Namespace
Modules: Chain
Constant Summary collapse
- MESSAGING_LIBRARY =
'Kafka'
- PRODUCE =
'Produce'
- CONSUME =
'Consume'
- INSTRUMENTATION_NAME =
'Rdkafka'
Instance Method Summary collapse
- #create_kafka_metrics(action:, topic:) ⇒ Object
- #each_with_new_relic(message) ⇒ Object
- #produce_with_new_relic(*args) ⇒ Object
Instance Method Details
#create_kafka_metrics(action:, topic:) ⇒ Object
53 54 55 56 57 58 59 60 61 62 |
# File 'lib/new_relic/agent/instrumentation/rdkafka/instrumentation.rb', line 53 def create_kafka_metrics(action:, topic:) hosts = [] # both 'bootstrap.servers' and 'metadata.broker.list' are valid ways to specify the Kafka server hosts << @nr_config[:'bootstrap.servers'] if @nr_config[:'bootstrap.servers'] hosts << @nr_config[:'metadata.broker.list'] if @nr_config[:'metadata.broker.list'] hosts.each do |host| NewRelic::Agent.record_metric("MessageBroker/Kafka/Nodes/#{host}/#{action}/#{topic}", 1) NewRelic::Agent.record_metric("MessageBroker/Kafka/Nodes/#{host}", 1) end end |
#each_with_new_relic(message) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/new_relic/agent/instrumentation/rdkafka/instrumentation.rb', line 35 def each_with_new_relic() NewRelic::Agent.record_instrumentation_invocation(INSTRUMENTATION_NAME) headers = &.headers || {} topic_name = &.topic NewRelic::Agent::Messaging.( library: MESSAGING_LIBRARY, destination_type: :topic, destination_name: topic_name, headers: headers, action: :consume ) do create_kafka_metrics(action: CONSUME, topic: topic_name) yield end end |
#produce_with_new_relic(*args) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/new_relic/agent/instrumentation/rdkafka/instrumentation.rb', line 15 def produce_with_new_relic(*args) NewRelic::Agent.record_instrumentation_invocation(INSTRUMENTATION_NAME) topic_name = args[0][:topic] segment = NewRelic::Agent::Tracer.( action: :produce, library: MESSAGING_LIBRARY, destination_type: :topic, destination_name: topic_name ) create_kafka_metrics(action: PRODUCE, topic: topic_name) headers = args[0][:headers] || {} ::NewRelic::Agent::DistributedTracing.insert_distributed_trace_headers(headers) NewRelic::Agent::Tracer.capture_segment_error(segment) { yield(headers) } ensure segment&.finish end |