Class: Karafka::Instrumentation::Vendors::Datadog::LoggerListener
- Inherits:
-
Object
- Object
- Karafka::Instrumentation::Vendors::Datadog::LoggerListener
- Extended by:
- Forwardable
- Includes:
- Core::Configurable
- Defined in:
- lib/karafka/instrumentation/vendors/datadog/logger_listener.rb
Overview
A karafka’s logger listener for Datadog It depends on the ‘ddtrace’ gem
Instance Method Summary collapse
-
#initialize(&block) ⇒ LoggerListener
constructor
A new instance of LoggerListener.
-
#on_error_occurred(event) ⇒ Object
There are many types of errors that can occur in many places, but we provide a single handler for all of them to simplify error instrumentation.
-
#on_worker_process(event) ⇒ Object
Prints info about the fact that a given job has started.
-
#on_worker_processed(event) ⇒ Object
Prints info about the fact that a given job has finished.
-
#pop_tags ⇒ Object
Pops datadog’s tags from the logger This is required when tracing log lines asynchronously to avoid the logs of the different processes to be correlated.
-
#push_tags ⇒ Object
Pushes datadog’s tags to the logger This is required when tracing log lines asynchronously to correlate logs of the same process together.
- #setup(&block) ⇒ Object
Constructor Details
#initialize(&block) ⇒ LoggerListener
Returns a new instance of LoggerListener.
35 36 37 38 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 35 def initialize(&block) configure setup(&block) if block end |
Instance Method Details
#on_error_occurred(event) ⇒ Object
There are many types of errors that can occur in many places, but we provide a single handler for all of them to simplify error instrumentation.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 86 def on_error_occurred(event) error = event[:error] client.active_span&.set_error(error) case event[:type] when 'consumer.consume.error' error "Consumer consuming error: #{error}" when 'consumer.revoked.error' error "Consumer on revoked failed due to an error: #{error}" when 'consumer.before_enqueue.error' error "Consumer before enqueue failed due to an error: #{error}" when 'consumer.before_consume.error' error "Consumer before consume failed due to an error: #{error}" when 'consumer.after_consume.error' error "Consumer after consume failed due to an error: #{error}" when 'consumer.shutdown.error' error "Consumer on shutdown failed due to an error: #{error}" when 'worker.process.error' fatal "Worker processing failed due to an error: #{error}" when 'connection.listener.fetch_loop.error' error "Listener fetch loop error: #{error}" when 'runner.call.error' fatal "Runner crashed due to an error: #{error}" when 'app.stopping.error' error 'Forceful Karafka server stop' when 'librdkafka.error' error "librdkafka internal error occurred: #{error}" # Those will only occur when retries in the client fail and when they did not stop # after back-offs when 'connection.client.poll.error' error "Data polling error occurred: #{error}" else # This should never happen. Please contact the maintainers raise Errors::UnsupportedCaseError, event end end |
#on_worker_process(event) ⇒ Object
Prints info about the fact that a given job has started
49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 49 def on_worker_process(event) current_span = client.trace('karafka.consumer', service: service_name) job = event[:job] job_type = job.class.to_s.split('::').last consumer = job.executor.topic.consumer topic = job.executor.topic.name current_span.resource = "#{consumer}#consume" info "[#{job.id}] #{job_type} job for #{consumer} on #{topic} started" end |
#on_worker_processed(event) ⇒ Object
Prints info about the fact that a given job has finished
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 67 def on_worker_processed(event) job = event[:job] time = event[:time] job_type = job.class.to_s.split('::').last consumer = job.executor.topic.consumer topic = job.executor.topic.name info "[#{job.id}] #{job_type} job for #{consumer} on #{topic} finished in #{time}ms" client.active_span&.finish end |
#pop_tags ⇒ Object
Pops datadog’s tags from the logger This is required when tracing log lines asynchronously to avoid the logs of the different processes to be correlated
146 147 148 149 150 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 146 def return unless Karafka.logger.respond_to?(:pop_tags) Karafka.logger. end |
#push_tags ⇒ Object
Pushes datadog’s tags to the logger This is required when tracing log lines asynchronously to correlate logs of the same process together
137 138 139 140 141 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 137 def return unless Karafka.logger.respond_to?(:push_tags) Karafka.logger.(client.log_correlation) end |
#setup(&block) ⇒ Object
We define this alias to be consistent with ‘WaterDrop#setup`
42 43 44 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 42 def setup(&block) configure(&block) end |