Class: Karafka::Instrumentation::LoggerListener

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/instrumentation/logger_listener.rb

Overview

Default listener that hooks up to our instrumentation and uses its events for logging It can be removed/replaced or anything without any harm to the Karafka app flow.

Instance Method Summary collapse

Constructor Details

#initialize(log_polling: true) ⇒ LoggerListener

Returns a new instance of LoggerListener.

Parameters:

  • log_polling (Boolean) (defaults to: true)

    should we log the fact that messages are being polled. This is usually noisy and not useful in production but can be useful in dev. While users can do this themselves this has been requested and asked for often, thus similar to how extensive logging can be disabled in WaterDrop, we do it here as well.



23
24
25
# File 'lib/karafka/instrumentation/logger_listener.rb', line 23

def initialize(log_polling: true)
  @log_polling = log_polling
end

Instance Method Details

#on_app_quiet(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



187
188
189
# File 'lib/karafka/instrumentation/logger_listener.rb', line 187

def on_app_quiet(_event)
  info 'Reached quiet mode. No messages will be processed anymore'
end

#on_app_quieting(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



182
183
184
# File 'lib/karafka/instrumentation/logger_listener.rb', line 182

def on_app_quieting(_event)
  info 'Switching to quiet mode. New messages will not be processed'
end

#on_app_running(_event) ⇒ Object

Logs info that we’re running Karafka app.

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



172
173
174
175
176
177
178
179
# File 'lib/karafka/instrumentation/logger_listener.rb', line 172

def on_app_running(_event)
  info "Running in #{RUBY_DESCRIPTION}"
  info "Running Karafka #{Karafka::VERSION} server"

  return if Karafka.pro?

  info 'See LICENSE and the LGPL-3.0 for licensing details'
end

#on_app_stopped(_event) ⇒ Object

Logs info that we stopped the Karafka server.

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



201
202
203
# File 'lib/karafka/instrumentation/logger_listener.rb', line 201

def on_app_stopped(_event)
  info 'Stopped Karafka server'
end

#on_app_stopping(_event) ⇒ Object

Logs info that we’re going to stop the Karafka server.

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



194
195
196
# File 'lib/karafka/instrumentation/logger_listener.rb', line 194

def on_app_stopping(_event)
  info 'Stopping Karafka server'
end

#on_client_pause(event) ⇒ Object

Note:

There may be no offset provided in case user wants to pause on the consecutive offset position. This can be beneficial when not wanting to purge the buffers.

Prints info about a consumer pause occurrence. Irrelevant if user or system initiated.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/karafka/instrumentation/logger_listener.rb', line 88

def on_client_pause(event)
  topic = event[:topic]
  partition = event[:partition]
  offset = event[:offset]
  client = event[:caller]

  info <<~MSG.tr("\n", ' ').strip!
    [#{client.id}]
    Pausing on topic #{topic}/#{partition}
    on #{offset ? "offset #{offset}" : 'the consecutive offset'}
  MSG
end

#on_client_resume(event) ⇒ Object

Prints information about resuming of processing of a given topic partition

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



104
105
106
107
108
109
110
111
112
# File 'lib/karafka/instrumentation/logger_listener.rb', line 104

def on_client_resume(event)
  topic = event[:topic]
  partition = event[:partition]
  client = event[:caller]

  info <<~MSG.tr("\n", ' ').strip!
    [#{client.id}] Resuming on topic #{topic}/#{partition}
  MSG
end

#on_connection_listener_fetch_loop(event) ⇒ Object

Logs each messages fetching attempt

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



30
31
32
33
34
35
# File 'lib/karafka/instrumentation/logger_listener.rb', line 30

def on_connection_listener_fetch_loop(event)
  return unless log_polling?

  listener = event[:caller]
  debug "[#{listener.id}] Polling messages..."
end

#on_connection_listener_fetch_loop_received(event) ⇒ Object

Logs about messages that we’ve received from Kafka

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/karafka/instrumentation/logger_listener.rb', line 40

def on_connection_listener_fetch_loop_received(event)
  return unless log_polling?

  listener = event[:caller]
  time = event[:time].round(2)
  messages_count = event[:messages_buffer].size

  message = "[#{listener.id}] Polled #{messages_count} messages in #{time}ms"

  # We don't want the "polled 0" in dev as it would spam the log
  # Instead we publish only info when there was anything we could poll and fail over to the
  # zero notifications when in debug mode
  messages_count.zero? ? debug(message) : info(message)
end

#on_consumer_consuming_retry(event) ⇒ Object

Prints info about retry of processing after an error

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/karafka/instrumentation/logger_listener.rb', line 117

def on_consumer_consuming_retry(event)
  topic = event[:topic]
  partition = event[:partition]
  offset = event[:offset]
  consumer = event[:caller]
  timeout = event[:timeout]

  info <<~MSG.tr("\n", ' ').strip!
    [#{consumer.id}] Retrying of #{consumer.class} after #{timeout} ms
    on topic #{topic}/#{partition} from offset #{offset}
  MSG
end

#on_consumer_consuming_seek(event) ⇒ Object

Prints info about seeking to a particular location

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



133
134
135
136
137
138
139
140
141
142
143
# File 'lib/karafka/instrumentation/logger_listener.rb', line 133

def on_consumer_consuming_seek(event)
  topic = event[:topic]
  partition = event[:partition]
  seek_offset = event[:message].offset
  consumer = event[:caller]

  info <<~MSG.tr("\n", ' ').strip!
    [#{consumer.id}] Seeking from #{consumer.class}
    on topic #{topic}/#{partition} to offset #{seek_offset}
  MSG
end

#on_dead_letter_queue_dispatched(event) ⇒ Object

Logs info when we have dispatched a message the the DLQ

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/karafka/instrumentation/logger_listener.rb', line 208

def on_dead_letter_queue_dispatched(event)
  consumer = event[:caller]
  topic = consumer.topic.name
  message = event[:message]
  offset = message.offset
  dlq_topic = consumer.topic.dead_letter_queue.topic
  partition = message.partition

  info <<~MSG.tr("\n", ' ').strip!
    [#{consumer.id}] Dispatched message #{offset}
    from #{topic}/#{partition}
    to DLQ topic: #{dlq_topic}
  MSG
end

#on_error_occurred(event) ⇒ Object

There are many types of errors that can occur in many places, but we provide a single handler for all of them to simplify error instrumentation.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# File 'lib/karafka/instrumentation/logger_listener.rb', line 287

def on_error_occurred(event)
  type = event[:type]
  error = event[:error]
  details = (error.backtrace || []).join("\n")

  case type
  when 'consumer.initialized.error'
    error "Consumer initialized error: #{error}"
    error details
  when 'consumer.wrap.error'
    error "Consumer wrap failed due to an error: #{error}"
    error details
  when 'consumer.consume.error'
    error "Consumer consuming error: #{error}"
    error details
  when 'consumer.revoked.error'
    error "Consumer on revoked failed due to an error: #{error}"
    error details
  when 'consumer.idle.error'
    error "Consumer idle failed due to an error: #{error}"
    error details
  when 'consumer.shutdown.error'
    error "Consumer on shutdown failed due to an error: #{error}"
    error details
  when 'consumer.tick.error'
    error "Consumer on tick failed due to an error: #{error}"
    error details
  when 'consumer.eofed.error'
    error "Consumer on eofed failed due to an error: #{error}"
    error details
  when 'consumer.after_consume.error'
    error "Consumer on after_consume failed due to an error: #{error}"
    error details
  when 'worker.process.error'
    fatal "Worker processing failed due to an error: #{error}"
    fatal details
  when 'connection.listener.fetch_loop.error'
    error "Listener fetch loop error: #{error}"
    error details
  when 'swarm.supervisor.error'
    fatal "Swarm supervisor crashed due to an error: #{error}"
    fatal details
  when 'runner.call.error'
    fatal "Runner crashed due to an error: #{error}"
    fatal details
  when 'app.stopping.error'
    # Counts number of workers and listeners that were still active when forcing the
    # shutdown. Please note, that unless all listeners are closed, workers will not finalize
    # their operations as well.
    # We need to check if listeners and workers are assigned as during super early stages of
    # boot they are not.
    listeners = Server.listeners ? Server.listeners.count(&:active?) : 0
    workers = Server.workers ? Server.workers.count(&:alive?) : 0

    message = <<~MSG.tr("\n", ' ').strip!
      Forceful Karafka server stop with:
      #{workers} active workers and
      #{listeners} active listeners
    MSG

    error message
  when 'app.forceful_stopping.error'
    error "Forceful shutdown error occurred: #{error}"
    error details
  when 'librdkafka.error'
    error "librdkafka internal error occurred: #{error}"
    error details
  # Those can occur when emitted statistics are consumed by the end user and the processing
  # of statistics fails. The statistics are emitted from librdkafka main loop thread and
  # any errors there crash the whole thread
  when 'callbacks.statistics.error'
    error "callbacks.statistics processing failed due to an error: #{error}"
    error details
  when 'callbacks.error.error'
    error "callbacks.error processing failed due to an error: #{error}"
    error details
  # Those will only occur when retries in the client fail and when they did not stop after
  # back-offs
  when 'connection.client.poll.error'
    error "Data polling error occurred: #{error}"
    error details
  when 'connection.client.rebalance_callback.error'
    error "Rebalance callback error occurred: #{error}"
    error details
  when 'connection.client.unsubscribe.error'
    error "Client unsubscribe error occurred: #{error}"
    error details
  # This handles any custom errors coming from places like Web-UI, etc
  else
    error "#{type} error occurred: #{error}"
    error details
  end
end

#on_filtering_seek(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/karafka/instrumentation/logger_listener.rb', line 242

def on_filtering_seek(event)
  consumer = event[:caller]
  topic = consumer.topic.name
  # Message to which we seek
  message = event[:message]
  partition = message.partition
  offset = message.offset

  info <<~MSG.tr("\n", ' ').strip!
    [#{consumer.id}] Post-filtering seeking to message #{offset}
    on #{topic}/#{partition}
  MSG
end

#on_filtering_throttled(event) ⇒ Object

Logs info about throttling event

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/karafka/instrumentation/logger_listener.rb', line 226

def on_filtering_throttled(event)
  consumer = event[:caller]
  topic = consumer.topic.name
  # Here we get last message before throttle
  message = event[:message]
  partition = message.partition
  offset = message.offset

  info <<~MSG.tr("\n", ' ').strip!
    [#{consumer.id}] Throttled and will resume
    from message #{offset}
    on #{topic}/#{partition}
  MSG
end

#on_process_notice_signal(event) ⇒ Object

Logs info about system signals that Karafka received and prints backtrace for threads in case of ttin

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/karafka/instrumentation/logger_listener.rb', line 149

def on_process_notice_signal(event)
  info "Received #{event[:signal]} system signal"

  # We print backtrace only for ttin
  return unless event[:signal] == :SIGTTIN

  # Inspired by Sidekiq
  Thread.list.each do |thread|
    tid = (thread.object_id ^ ::Process.pid).to_s(36)

    warn "Thread TID-#{tid} #{thread.name}"

    if thread.backtrace
      warn thread.backtrace.join("\n")
    else
      warn '<no backtrace available>'
    end
  end
end

#on_swarm_manager_before_fork(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



269
270
271
# File 'lib/karafka/instrumentation/logger_listener.rb', line 269

def on_swarm_manager_before_fork(event)
  debug "Swarm manager starting node with id: #{event[:node].id}"
end

#on_swarm_manager_control(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



279
280
281
282
# File 'lib/karafka/instrumentation/logger_listener.rb', line 279

def on_swarm_manager_control(event)
  pids = event[:caller].nodes.map(&:pid).join(', ')
  debug "Swarm manager checking nodes: #{pids}"
end

#on_swarm_manager_stopping(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



257
258
259
260
# File 'lib/karafka/instrumentation/logger_listener.rb', line 257

def on_swarm_manager_stopping(event)
  node = event[:node]
  error "Swarm manager detected unhealthy node #{node.pid}. Sending TERM signal..."
end

#on_swarm_manager_terminating(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



263
264
265
266
# File 'lib/karafka/instrumentation/logger_listener.rb', line 263

def on_swarm_manager_terminating(event)
  node = event[:node]
  error "Swarm manager detected unresponsive node #{node.pid}. Sending KILL signal..."
end

#on_swarm_node_after_fork(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



274
275
276
# File 'lib/karafka/instrumentation/logger_listener.rb', line 274

def on_swarm_node_after_fork(_event)
  info "Swarm node #{::Process.pid} forked from #{::Process.ppid}"
end

#on_worker_process(event) ⇒ Object

Prints info about the fact that a given job has started

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



58
59
60
61
62
63
64
65
# File 'lib/karafka/instrumentation/logger_listener.rb', line 58

def on_worker_process(event)
  job = event[:job]
  job_type = job.class.to_s.split('::').last
  consumer = job.executor.topic.consumer
  topic = job.executor.topic.name
  partition = job.executor.partition
  info "[#{job.id}] #{job_type} job for #{consumer} on #{topic}/#{partition} started"
end

#on_worker_processed(event) ⇒ Object

Prints info about the fact that a given job has finished

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/karafka/instrumentation/logger_listener.rb', line 70

def on_worker_processed(event)
  job = event[:job]
  time = event[:time].round(2)
  job_type = job.class.to_s.split('::').last
  consumer = job.executor.topic.consumer
  topic = job.executor.topic.name
  partition = job.executor.partition
  info <<~MSG.tr("\n", ' ').strip!
    [#{job.id}] #{job_type} job for #{consumer}
    on #{topic}/#{partition} finished in #{time} ms
  MSG
end