Class: Karafka::Instrumentation::Vendors::Datadog::LoggerListener
- Inherits:
-
Object
- Object
- Karafka::Instrumentation::Vendors::Datadog::LoggerListener
- Extended by:
- Forwardable
- Includes:
- Core::Configurable
- Defined in:
- lib/karafka/instrumentation/vendors/datadog/logger_listener.rb
Overview
A karafka’s logger listener for Datadog It depends on the ‘ddtrace’ gem
Instance Method Summary collapse
-
#initialize(&block) ⇒ LoggerListener
constructor
A new instance of LoggerListener.
-
#on_error_occurred(event) ⇒ Object
There are many types of errors that can occur in many places, but we provide a single handler for all of them to simplify error instrumentation.
-
#on_worker_process(event) ⇒ Object
Prints info about the fact that a given job has started.
-
#on_worker_processed(event) ⇒ Object
Prints info about the fact that a given job has finished.
-
#pop_tags ⇒ Object
Pops datadog’s tags from the logger This is required when tracing log lines asynchronously to avoid the logs of the different processes to be correlated.
-
#push_tags ⇒ Object
Pushes datadog’s tags to the logger This is required when tracing log lines asynchronously to correlate logs of the same process together.
- #setup(&block) ⇒ Object
Constructor Details
#initialize(&block) ⇒ LoggerListener
Returns a new instance of LoggerListener.
35 36 37 38 39 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 35 def initialize(&block) configure setup(&block) if block @job_types_cache = {} end |
Instance Method Details
#on_error_occurred(event) ⇒ Object
There are many types of errors that can occur in many places, but we provide a single handler for all of them to simplify error instrumentation.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 87 def on_error_occurred(event) error = event[:error] client.active_span&.set_error(error) case event[:type] when 'consumer.consume.error' error "Consumer consuming error: #{error}" when 'consumer.revoked.error' error "Consumer on revoked failed due to an error: #{error}" when 'consumer.before_schedule.error' error "Consumer before schedule failed due to an error: #{error}" when 'consumer.before_consume.error' error "Consumer before consume failed due to an error: #{error}" when 'consumer.after_consume.error' error "Consumer after consume failed due to an error: #{error}" when 'consumer.shutdown.error' error "Consumer on shutdown failed due to an error: #{error}" when 'consumer.tick.error' error "Consumer tick failed due to an error: #{error}" when 'consumer.eofed.error' error "Consumer eofed failed due to an error: #{error}" when 'worker.process.error' fatal "Worker processing failed due to an error: #{error}" when 'connection.listener.fetch_loop.error' error "Listener fetch loop error: #{error}" when 'runner.call.error' fatal "Runner crashed due to an error: #{error}" when 'app.stopping.error' error 'Forceful Karafka server stop' when 'swarm.supervisor.error' fatal "Swarm supervisor crashed due to an error: #{error}" when 'librdkafka.error' error "librdkafka internal error occurred: #{error}" # Those will only occur when retries in the client fail and when they did not stop # after back-offs when 'connection.client.poll.error' error "Data polling error occurred: #{error}" else # This should never happen. Please contact the maintainers raise Errors::UnsupportedCaseError, event end end |
#on_worker_process(event) ⇒ Object
Prints info about the fact that a given job has started
50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 50 def on_worker_process(event) current_span = client.trace('karafka.consumer', service: service_name) job = event[:job] job_type = fetch_job_type(job.class) consumer = job.executor.topic.consumer topic = job.executor.topic.name current_span.resource = "#{consumer}##{job.class.action}" info "[#{job.id}] #{job_type} job for #{consumer} on #{topic} started" end |
#on_worker_processed(event) ⇒ Object
Prints info about the fact that a given job has finished
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 68 def on_worker_processed(event) job = event[:job] time = event[:time] job_type = job.class.to_s.split('::').last consumer = job.executor.topic.consumer topic = job.executor.topic.name info "[#{job.id}] #{job_type} job for #{consumer} on #{topic} finished in #{time}ms" client.active_span&.finish end |
#pop_tags ⇒ Object
Pops datadog’s tags from the logger This is required when tracing log lines asynchronously to avoid the logs of the different processes to be correlated
153 154 155 156 157 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 153 def return unless Karafka.logger.respond_to?(:pop_tags) Karafka.logger. end |
#push_tags ⇒ Object
Pushes datadog’s tags to the logger This is required when tracing log lines asynchronously to correlate logs of the same process together
144 145 146 147 148 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 144 def return unless Karafka.logger.respond_to?(:push_tags) Karafka.logger.(client.log_correlation) end |
#setup(&block) ⇒ Object
We define this alias to be consistent with ‘WaterDrop#setup`
43 44 45 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 43 def setup(&block) configure(&block) end |