Class: Karafka::Instrumentation::LoggerListener

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/instrumentation/logger_listener.rb

Overview

Default listener that hooks up to our instrumentation and uses its events for logging It can be removed/replaced or anything without any harm to the Karafka app flow.

Instance Method Summary collapse

Instance Method Details

#on_app_quiet(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



151
152
153
# File 'lib/karafka/instrumentation/logger_listener.rb', line 151

def on_app_quiet(_event)
  info 'Reached quiet mode. No messages will be processed anymore'
end

#on_app_quieting(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



146
147
148
# File 'lib/karafka/instrumentation/logger_listener.rb', line 146

def on_app_quieting(_event)
  info 'Switching to quiet mode. New messages will not be processed'
end

#on_app_running(_event) ⇒ Object

Logs info that we’re running Karafka app.

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



136
137
138
139
140
141
142
143
# File 'lib/karafka/instrumentation/logger_listener.rb', line 136

def on_app_running(_event)
  info "Running in #{RUBY_DESCRIPTION}"
  info "Running Karafka #{Karafka::VERSION} server"

  return if Karafka.pro?

  info 'See LICENSE and the LGPL-3.0 for licensing details'
end

#on_app_stopped(_event) ⇒ Object

Logs info that we stopped the Karafka server.

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



165
166
167
# File 'lib/karafka/instrumentation/logger_listener.rb', line 165

def on_app_stopped(_event)
  info 'Stopped Karafka server'
end

#on_app_stopping(_event) ⇒ Object

Logs info that we’re going to stop the Karafka server.

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



158
159
160
# File 'lib/karafka/instrumentation/logger_listener.rb', line 158

def on_app_stopping(_event)
  info 'Stopping Karafka server'
end

#on_client_pause(event) ⇒ Object

Prints info about a consumer pause occurrence. Irrelevant if user or system initiated.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



69
70
71
72
73
74
75
76
77
78
# File 'lib/karafka/instrumentation/logger_listener.rb', line 69

def on_client_pause(event)
  topic = event[:topic]
  partition = event[:partition]
  offset = event[:offset]
  client = event[:caller]

  info <<~MSG.tr("\n", ' ').strip!
    [#{client.id}] Pausing partition #{partition} of topic #{topic} on offset #{offset}
  MSG
end

#on_client_resume(event) ⇒ Object

Prints information about resuming of processing of a given topic partition

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



83
84
85
86
87
88
89
90
91
# File 'lib/karafka/instrumentation/logger_listener.rb', line 83

def on_client_resume(event)
  topic = event[:topic]
  partition = event[:partition]
  client = event[:caller]

  info <<~MSG.tr("\n", ' ').strip!
    [#{client.id}] Resuming partition #{partition} of topic #{topic}
  MSG
end

#on_connection_listener_fetch_loop(event) ⇒ Object

Logs each messages fetching attempt

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



22
23
24
25
# File 'lib/karafka/instrumentation/logger_listener.rb', line 22

def on_connection_listener_fetch_loop(event)
  listener = event[:caller]
  debug "[#{listener.id}] Polling messages..."
end

#on_connection_listener_fetch_loop_received(event) ⇒ Object

Logs about messages that we’ve received from Kafka

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/karafka/instrumentation/logger_listener.rb', line 30

def on_connection_listener_fetch_loop_received(event)
  listener = event[:caller]
  time = event[:time]
  messages_count = event[:messages_buffer].size

  message = "[#{listener.id}] Polled #{messages_count} messages in #{time}ms"

  # We don't want the "polled 0" in dev as it would spam the log
  # Instead we publish only info when there was anything we could poll and fail over to the
  # zero notifications when in debug mode
  messages_count.zero? ? debug(message) : info(message)
end

#on_consumer_consuming_retry(event) ⇒ Object

Prints info about retry of processing after an error

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/karafka/instrumentation/logger_listener.rb', line 96

def on_consumer_consuming_retry(event)
  topic = event[:topic]
  partition = event[:partition]
  offset = event[:offset]
  consumer = event[:caller]
  timeout = event[:timeout]

  info <<~MSG.tr("\n", ' ').strip!
    [#{consumer.id}] Retrying of #{consumer.class} after #{timeout} ms
    on partition #{partition} of topic #{topic} from offset #{offset}
  MSG
end

#on_dead_letter_queue_dispatched(event) ⇒ Object

Logs info when we have dispatched a message the the DLQ

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



172
173
174
175
176
177
178
179
180
# File 'lib/karafka/instrumentation/logger_listener.rb', line 172

def on_dead_letter_queue_dispatched(event)
  message = event[:message]
  offset = message.offset
  topic = event[:caller].topic.name
  dlq_topic = event[:caller].topic.dead_letter_queue.topic
  partition = message.partition

  info "Dispatched message #{offset} from #{topic}/#{partition} to DLQ topic: #{dlq_topic}"
end

#on_error_occurred(event) ⇒ Object

There are many types of errors that can occur in many places, but we provide a single handler for all of them to simplify error instrumentation.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/karafka/instrumentation/logger_listener.rb', line 185

def on_error_occurred(event)
  type = event[:type]
  error = event[:error]
  details = (error.backtrace || []).join("\n")

  case type
  when 'consumer.consume.error'
    error "Consumer consuming error: #{error}"
    error details
  when 'consumer.revoked.error'
    error "Consumer on revoked failed due to an error: #{error}"
    error details
  when 'consumer.before_enqueue.error'
    error "Consumer before enqueue failed due to an error: #{error}"
    error details
  when 'consumer.before_consume.error'
    error "Consumer before consume failed due to an error: #{error}"
    error details
  when 'consumer.after_consume.error'
    error "Consumer after consume failed due to an error: #{error}"
    error details
  when 'consumer.shutdown.error'
    error "Consumer on shutdown failed due to an error: #{error}"
    error details
  when 'worker.process.error'
    fatal "Worker processing failed due to an error: #{error}"
    fatal details
  when 'connection.listener.fetch_loop.error'
    error "Listener fetch loop error: #{error}"
    error details
  when 'runner.call.error'
    fatal "Runner crashed due to an error: #{error}"
    fatal details
  when 'app.stopping.error'
    error 'Forceful Karafka server stop'
  when 'librdkafka.error'
    error "librdkafka internal error occurred: #{error}"
    error details
  # Those will only occur when retries in the client fail and when they did not stop after
  # back-offs
  when 'connection.client.poll.error'
    error "Data polling error occurred: #{error}"
    error details
  else
    # This should never happen. Please contact the maintainers
    raise Errors::UnsupportedCaseError, event
  end
end

#on_process_notice_signal(event) ⇒ Object

Logs info about system signals that Karafka received and prints backtrace for threads in case of ttin

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/karafka/instrumentation/logger_listener.rb', line 113

def on_process_notice_signal(event)
  info "Received #{event[:signal]} system signal"

  # We print backtrace only for ttin
  return unless event[:signal] == :SIGTTIN

  # Inspired by Sidekiq
  Thread.list.each do |thread|
    tid = (thread.object_id ^ ::Process.pid).to_s(36)

    warn "Thread TID-#{tid} #{thread['label']}"

    if thread.backtrace
      warn thread.backtrace.join("\n")
    else
      warn '<no backtrace available>'
    end
  end
end

#on_worker_process(event) ⇒ Object

Prints info about the fact that a given job has started

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



46
47
48
49
50
51
52
# File 'lib/karafka/instrumentation/logger_listener.rb', line 46

def on_worker_process(event)
  job = event[:job]
  job_type = job.class.to_s.split('::').last
  consumer = job.executor.topic.consumer
  topic = job.executor.topic.name
  info "[#{job.id}] #{job_type} job for #{consumer} on #{topic} started"
end

#on_worker_processed(event) ⇒ Object

Prints info about the fact that a given job has finished

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



57
58
59
60
61
62
63
64
# File 'lib/karafka/instrumentation/logger_listener.rb', line 57

def on_worker_processed(event)
  job = event[:job]
  time = event[:time]
  job_type = job.class.to_s.split('::').last
  consumer = job.executor.topic.consumer
  topic = job.executor.topic.name
  info "[#{job.id}] #{job_type} job for #{consumer} on #{topic} finished in #{time}ms"
end