Class: Karafka::Instrumentation::Vendors::Datadog::LoggerListener
- Inherits:
-
Object
- Object
- Karafka::Instrumentation::Vendors::Datadog::LoggerListener
- Extended by:
- Forwardable
- Includes:
- Core::Configurable
- Defined in:
- lib/karafka/instrumentation/vendors/datadog/logger_listener.rb
Overview
A karafka’s logger listener for Datadog It depends on the ‘ddtrace’ gem
Instance Method Summary collapse
-
#initialize(&block) ⇒ LoggerListener
constructor
A new instance of LoggerListener.
-
#on_error_occurred(event) ⇒ Object
There are many types of errors that can occur in many places, but we provide a single handler for all of them to simplify error instrumentation.
-
#on_worker_process(event) ⇒ Object
Prints info about the fact that a given job has started.
-
#on_worker_processed(event) ⇒ Object
Prints info about the fact that a given job has finished.
-
#pop_tags ⇒ Object
Pops datadog’s tags from the logger This is required when tracing log lines asynchronously to avoid the logs of the different processes to be correlated.
-
#push_tags ⇒ Object
Pushes datadog’s tags to the logger This is required when tracing log lines asynchronously to correlate logs of the same process together.
- #setup(&block) ⇒ Object
Constructor Details
#initialize(&block) ⇒ LoggerListener
Returns a new instance of LoggerListener.
35 36 37 38 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 35 def initialize(&block) configure setup(&block) if block end |
Instance Method Details
#on_error_occurred(event) ⇒ Object
There are many types of errors that can occur in many places, but we provide a single handler for all of them to simplify error instrumentation.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 103 def on_error_occurred(event) error = event[:error] client.active_span&.set_error(error) case event[:type] when 'consumer.consume.error' error "Consumer consuming error: #{error}" when 'consumer.revoked.error' error "Consumer on revoked failed due to an error: #{error}" when 'consumer.before_schedule.error' error "Consumer before schedule failed due to an error: #{error}" when 'consumer.before_consume.error' error "Consumer before consume failed due to an error: #{error}" when 'consumer.after_consume.error' error "Consumer after consume failed due to an error: #{error}" when 'consumer.shutdown.error' error "Consumer on shutdown failed due to an error: #{error}" when 'consumer.tick.error' error "Consumer tick failed due to an error: #{error}" when 'worker.process.error' fatal "Worker processing failed due to an error: #{error}" when 'connection.listener.fetch_loop.error' error "Listener fetch loop error: #{error}" when 'runner.call.error' fatal "Runner crashed due to an error: #{error}" when 'app.stopping.error' error 'Forceful Karafka server stop' when 'swarm.supervisor.error' fatal "Swarm supervisor crashed due to an error: #{error}" when 'librdkafka.error' error "librdkafka internal error occurred: #{error}" # Those will only occur when retries in the client fail and when they did not stop # after back-offs when 'connection.client.poll.error' error "Data polling error occurred: #{error}" else # This should never happen. Please contact the maintainers raise Errors::UnsupportedCaseError, event end end |
#on_worker_process(event) ⇒ Object
Prints info about the fact that a given job has started
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 49 def on_worker_process(event) current_span = client.trace('karafka.consumer', service: service_name) job = event[:job] job_type = job.class.to_s.split('::').last consumer = job.executor.topic.consumer topic = job.executor.topic.name action = case job_type when 'Periodic' 'tick' when 'PeriodicNonBlocking' 'tick' when 'Shutdown' 'shutdown' when 'Revoked' 'revoked' when 'RevokedNonBlocking' 'revoked' when 'Idle' 'idle' else 'consume' end current_span.resource = "#{consumer}##{action}" info "[#{job.id}] #{job_type} job for #{consumer} on #{topic} started" end |
#on_worker_processed(event) ⇒ Object
Prints info about the fact that a given job has finished
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 84 def on_worker_processed(event) job = event[:job] time = event[:time] job_type = job.class.to_s.split('::').last consumer = job.executor.topic.consumer topic = job.executor.topic.name info "[#{job.id}] #{job_type} job for #{consumer} on #{topic} finished in #{time}ms" client.active_span&.finish end |
#pop_tags ⇒ Object
Pops datadog’s tags from the logger This is required when tracing log lines asynchronously to avoid the logs of the different processes to be correlated
167 168 169 170 171 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 167 def return unless Karafka.logger.respond_to?(:pop_tags) Karafka.logger. end |
#push_tags ⇒ Object
Pushes datadog’s tags to the logger This is required when tracing log lines asynchronously to correlate logs of the same process together
158 159 160 161 162 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 158 def return unless Karafka.logger.respond_to?(:push_tags) Karafka.logger.(client.log_correlation) end |
#setup(&block) ⇒ Object
We define this alias to be consistent with ‘WaterDrop#setup`
42 43 44 |
# File 'lib/karafka/instrumentation/vendors/datadog/logger_listener.rb', line 42 def setup(&block) configure(&block) end |