Module: NewRelic::Agent::Messaging
Overview
This module contains helper methods to facilitate instrumentation of message brokers.
Constant Summary collapse
- RABBITMQ_TRANSPORT_TYPE =
'RabbitMQ'
- ATTR_DESTINATION =
AttributeFilter::DST_TRANSACTION_EVENTS | AttributeFilter::DST_TRANSACTION_TRACER | AttributeFilter::DST_ERROR_COLLECTOR
Instance Method Summary collapse
-
#start_amqp_consume_segment(library:, destination_name:, delivery_info:, message_properties:, exchange_type: nil, queue_name: nil, start_time: nil) ⇒ NewRelic::Agent::Transaction::MessageBrokerSegment
Start a MessageBroker segment configured to trace an AMQP consume.
-
#start_amqp_publish_segment(library:, destination_name:, headers: nil, routing_key: nil, reply_to: nil, correlation_id: nil, exchange_type: nil) ⇒ NewRelic::Agent::Transaction::MessageBrokerSegment
Start a MessageBroker segment configured to trace an AMQP publish.
-
#start_message_broker_segment(action: nil, library: nil, destination_type: nil, destination_name: nil, headers: nil, parameters: nil, start_time: nil) ⇒ NewRelic::Agent::Transaction::MessageBrokerSegment
Start a MessageBroker segment configured to trace a messaging action.
-
#wrap_amqp_consume_transaction(library: nil, destination_name: nil, delivery_info: nil, message_properties: nil, exchange_type: nil, queue_name: nil, &block) ⇒ Object
Wrap a MessageBroker transaction trace around a AMQP messaging handling block.
-
#wrap_message_broker_consume_transaction(library:, destination_type:, destination_name:, headers: nil, routing_key: nil, queue_name: nil, exchange_type: nil, reply_to: nil, correlation_id: nil, action: nil) ⇒ Object
Wrap a MessageBroker transaction trace around a messaging handling block.
Instance Method Details
#start_amqp_consume_segment(library:, destination_name:, delivery_info:, message_properties:, exchange_type: nil, queue_name: nil, start_time: nil) ⇒ NewRelic::Agent::Transaction::MessageBrokerSegment
Start a MessageBroker segment configured to trace an AMQP consume. Finishing this segment will handle timing and recording of the proper metrics for New Relic’s messaging features. This method is a convenience wrapper around NewRelic::Agent::Tracer.start_message_broker_segment.
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/new_relic/agent/messaging.rb', line 245 def start_amqp_consume_segment(library:, destination_name:, delivery_info:, message_properties:, exchange_type: nil, queue_name: nil, start_time: nil) segment = Tracer.( action: :consume, library: library, destination_name: destination_name, destination_type: :exchange, headers: [:headers], start_time: start_time ) if segment_parameters_enabled? if [:headers] && ![:headers].empty? non_cat_headers = CrossAppTracing.reject_messaging_cat_headers([:headers]) non_synth_headers = SyntheticsMonitor.reject_messaging_synthetics_header(non_cat_headers) segment.params[:headers] = non_synth_headers unless non_synth_headers.empty? end segment.params[:routing_key] = delivery_info[:routing_key] if delivery_info[:routing_key] segment.params[:reply_to] = [:reply_to] if [:reply_to] segment.params[:queue_name] = queue_name if queue_name segment.params[:exchange_type] = exchange_type if exchange_type segment.params[:exchange_name] = delivery_info[:exchange_name] if delivery_info[:exchange_name] segment.params[:correlation_id] = [:correlation_id] if [:correlation_id] end segment end |
#start_amqp_publish_segment(library:, destination_name:, headers: nil, routing_key: nil, reply_to: nil, correlation_id: nil, exchange_type: nil) ⇒ NewRelic::Agent::Transaction::MessageBrokerSegment
Start a MessageBroker segment configured to trace an AMQP publish. Finishing this segment will handle timing and recording of the proper metrics for New Relic’s messaging features. This method is a convenience wrapper around NewRelic::Agent::Tracer.start_message_broker_segment.
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/new_relic/agent/messaging.rb', line 185 def start_amqp_publish_segment(library:, destination_name:, headers: nil, routing_key: nil, reply_to: nil, correlation_id: nil, exchange_type: nil) raise ArgumentError, 'missing required argument: headers' if headers.nil? && CrossAppTracing.cross_app_enabled? # The following line needs else branch coverage original_headers = headers.nil? ? nil : headers.dup # rubocop:disable Style/SafeNavigation segment = Tracer.( action: :produce, library: library, destination_type: :exchange, destination_name: destination_name, headers: headers ) if segment_parameters_enabled? segment.params[:headers] = original_headers if original_headers && !original_headers.empty? segment.params[:routing_key] = routing_key if routing_key segment.params[:reply_to] = reply_to if reply_to segment.params[:correlation_id] = correlation_id if correlation_id segment.params[:exchange_type] = exchange_type if exchange_type end segment end |
#start_message_broker_segment(action: nil, library: nil, destination_type: nil, destination_name: nil, headers: nil, parameters: nil, start_time: nil) ⇒ NewRelic::Agent::Transaction::MessageBrokerSegment
Start a MessageBroker segment configured to trace a messaging action. Finishing this segment will handle timing and recording of the proper metrics for New Relic’s messaging features..
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/new_relic/agent/messaging.rb', line 54 def (action: nil, library: nil, destination_type: nil, destination_name: nil, headers: nil, parameters: nil, start_time: nil) Tracer.( action: action, library: library, destination_type: destination_type, destination_name: destination_name, headers: headers, parameters: parameters, start_time: start_time ) end |
#wrap_amqp_consume_transaction(library: nil, destination_name: nil, delivery_info: nil, message_properties: nil, exchange_type: nil, queue_name: nil, &block) ⇒ Object
Wrap a MessageBroker transaction trace around a AMQP messaging handling block. This API is intended to be used in AMQP-specific library instrumentation when a “push”-style callback is invoked to handle an incoming message.
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 |
# File 'lib/new_relic/agent/messaging.rb', line 306 def wrap_amqp_consume_transaction(library: nil, destination_name: nil, delivery_info: nil, message_properties: nil, exchange_type: nil, queue_name: nil, &block) (library: library, destination_type: :exchange, destination_name: Instrumentation::Bunny.exchange_name(destination_name), routing_key: delivery_info[:routing_key], reply_to: [:reply_to], queue_name: queue_name, exchange_type: exchange_type, headers: [:headers], correlation_id: [:correlation_id], &block) end |
#wrap_message_broker_consume_transaction(library:, destination_type:, destination_name:, headers: nil, routing_key: nil, queue_name: nil, exchange_type: nil, reply_to: nil, correlation_id: nil, action: nil) ⇒ Object
Wrap a MessageBroker transaction trace around a messaging handling block. This API is intended to be used in library instrumentation when a “push”- style callback is invoked to handle an incoming message.
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/new_relic/agent/messaging.rb', line 112 def (library:, destination_type:, destination_name:, headers: nil, routing_key: nil, queue_name: nil, exchange_type: nil, reply_to: nil, correlation_id: nil, action: nil) state = Tracer.state return yield if state.current_transaction txn = nil begin txn_name = transaction_name(library, destination_type, destination_name, action) txn = Tracer.start_transaction(name: txn_name, category: :message) if headers NewRelic::Agent::DistributedTracing::accept_distributed_trace_headers(headers, library) # to handle the new w3c headers txn.distributed_tracer.(headers, state, library) # to do the expected old things CrossAppTracing.reject_messaging_cat_headers(headers).each do |k, v| txn.add_agent_attribute(:"message.headers.#{k}", v, AttributeFilter::DST_NONE) unless v.nil? end end txn.add_agent_attribute(:'message.routingKey', routing_key, ATTR_DESTINATION) if routing_key txn.add_agent_attribute(:'message.exchangeType', exchange_type, AttributeFilter::DST_NONE) if exchange_type txn.add_agent_attribute(:'message.correlationId', correlation_id, AttributeFilter::DST_NONE) if correlation_id txn.add_agent_attribute(:'message.queueName', queue_name, ATTR_DESTINATION) if queue_name txn.add_agent_attribute(:'message.replyTo', reply_to, AttributeFilter::DST_NONE) if reply_to rescue => e NewRelic::Agent.logger.error('Error starting Message Broker consume transaction', e) end yield ensure begin # the following line needs else branch coverage txn.finish if txn # rubocop:disable Style/SafeNavigation rescue => e NewRelic::Agent.logger.error('Error stopping Message Broker consume transaction', e) end end |