Class: Karafka::Instrumentation::LoggerListener

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/instrumentation/logger_listener.rb

Overview

Default listener that hooks up to our instrumentation and uses its events for logging It can be removed/replaced or anything without any harm to the Karafka app flow.

Instance Method Summary collapse

Instance Method Details

#on_app_quiet(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



156
157
158
# File 'lib/karafka/instrumentation/logger_listener.rb', line 156

def on_app_quiet(_event)
  info 'Reached quiet mode. No messages will be processed anymore'
end

#on_app_quieting(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



151
152
153
# File 'lib/karafka/instrumentation/logger_listener.rb', line 151

def on_app_quieting(_event)
  info 'Switching to quiet mode. New messages will not be processed'
end

#on_app_running(_event) ⇒ Object

Logs info that we’re running Karafka app.

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



141
142
143
144
145
146
147
148
# File 'lib/karafka/instrumentation/logger_listener.rb', line 141

def on_app_running(_event)
  info "Running in #{RUBY_DESCRIPTION}"
  info "Running Karafka #{Karafka::VERSION} server"

  return if Karafka.pro?

  info 'See LICENSE and the LGPL-3.0 for licensing details'
end

#on_app_stopped(_event) ⇒ Object

Logs info that we stopped the Karafka server.

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



170
171
172
# File 'lib/karafka/instrumentation/logger_listener.rb', line 170

def on_app_stopped(_event)
  info 'Stopped Karafka server'
end

#on_app_stopping(_event) ⇒ Object

Logs info that we’re going to stop the Karafka server.

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



163
164
165
# File 'lib/karafka/instrumentation/logger_listener.rb', line 163

def on_app_stopping(_event)
  info 'Stopping Karafka server'
end

#on_client_pause(event) ⇒ Object

Prints info about a consumer pause occurrence. Irrelevant if user or system initiated.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



74
75
76
77
78
79
80
81
82
83
# File 'lib/karafka/instrumentation/logger_listener.rb', line 74

def on_client_pause(event)
  topic = event[:topic]
  partition = event[:partition]
  offset = event[:offset]
  client = event[:caller]

  info <<~MSG.tr("\n", ' ').strip!
    [#{client.id}] Pausing on topic #{topic}/#{partition} on offset #{offset}
  MSG
end

#on_client_resume(event) ⇒ Object

Prints information about resuming of processing of a given topic partition

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



88
89
90
91
92
93
94
95
96
# File 'lib/karafka/instrumentation/logger_listener.rb', line 88

def on_client_resume(event)
  topic = event[:topic]
  partition = event[:partition]
  client = event[:caller]

  info <<~MSG.tr("\n", ' ').strip!
    [#{client.id}] Resuming on topic #{topic}/#{partition}
  MSG
end

#on_connection_listener_fetch_loop(event) ⇒ Object

Logs each messages fetching attempt

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



22
23
24
25
# File 'lib/karafka/instrumentation/logger_listener.rb', line 22

def on_connection_listener_fetch_loop(event)
  listener = event[:caller]
  debug "[#{listener.id}] Polling messages..."
end

#on_connection_listener_fetch_loop_received(event) ⇒ Object

Logs about messages that we’ve received from Kafka

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/karafka/instrumentation/logger_listener.rb', line 30

def on_connection_listener_fetch_loop_received(event)
  listener = event[:caller]
  time = event[:time]
  messages_count = event[:messages_buffer].size

  message = "[#{listener.id}] Polled #{messages_count} messages in #{time}ms"

  # We don't want the "polled 0" in dev as it would spam the log
  # Instead we publish only info when there was anything we could poll and fail over to the
  # zero notifications when in debug mode
  messages_count.zero? ? debug(message) : info(message)
end

#on_consumer_consuming_retry(event) ⇒ Object

Prints info about retry of processing after an error

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/karafka/instrumentation/logger_listener.rb', line 101

def on_consumer_consuming_retry(event)
  topic = event[:topic]
  partition = event[:partition]
  offset = event[:offset]
  consumer = event[:caller]
  timeout = event[:timeout]

  info <<~MSG.tr("\n", ' ').strip!
    [#{consumer.id}] Retrying of #{consumer.class} after #{timeout} ms
    on topic #{topic}/#{partition} from offset #{offset}
  MSG
end

#on_dead_letter_queue_dispatched(event) ⇒ Object

Logs info when we have dispatched a message the the DLQ

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/karafka/instrumentation/logger_listener.rb', line 177

def on_dead_letter_queue_dispatched(event)
  consumer = event[:caller]
  topic = consumer.topic.name
  message = event[:message]
  offset = message.offset
  dlq_topic = consumer.topic.dead_letter_queue.topic
  partition = message.partition

  info <<~MSG.tr("\n", ' ').strip!
    [#{consumer.id}] Dispatched message #{offset}
    from #{topic}/#{partition}
    to DLQ topic: #{dlq_topic}
  MSG
end

#on_error_occurred(event) ⇒ Object

There are many types of errors that can occur in many places, but we provide a single handler for all of them to simplify error instrumentation.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/karafka/instrumentation/logger_listener.rb', line 228

def on_error_occurred(event)
  type = event[:type]
  error = event[:error]
  details = (error.backtrace || []).join("\n")

  case type
  when 'consumer.consume.error'
    error "Consumer consuming error: #{error}"
    error details
  when 'consumer.revoked.error'
    error "Consumer on revoked failed due to an error: #{error}"
    error details
  when 'consumer.before_enqueue.error'
    error "Consumer before enqueue failed due to an error: #{error}"
    error details
  when 'consumer.before_consume.error'
    error "Consumer before consume failed due to an error: #{error}"
    error details
  when 'consumer.after_consume.error'
    error "Consumer after consume failed due to an error: #{error}"
    error details
  when 'consumer.idle.error'
    error "Consumer idle failed due to an error: #{error}"
    error details
  when 'consumer.shutdown.error'
    error "Consumer on shutdown failed due to an error: #{error}"
    error details
  when 'worker.process.error'
    fatal "Worker processing failed due to an error: #{error}"
    fatal details
  when 'connection.listener.fetch_loop.error'
    error "Listener fetch loop error: #{error}"
    error details
  when 'runner.call.error'
    fatal "Runner crashed due to an error: #{error}"
    fatal details
  when 'app.stopping.error'
    error 'Forceful Karafka server stop'
  when 'librdkafka.error'
    error "librdkafka internal error occurred: #{error}"
    error details
  # Those can occur when emitted statistics are consumed by the end user and the processing
  # of statistics fails. The statistics are emitted from librdkafka main loop thread and
  # any errors there crash the whole thread
  when 'statistics.emitted.error'
    error "statistics.emitted processing failed due to an error: #{error}"
    error details
  # Those will only occur when retries in the client fail and when they did not stop after
  # back-offs
  when 'connection.client.poll.error'
    error "Data polling error occurred: #{error}"
    error details
  when 'connection.client.rebalance_callback.error'
    error "Rebalance callback error occurred: #{error}"
    error details
  when 'connection.client.unsubscribe.error'
    error "Client unsubscribe error occurred: #{error}"
    error details
  else
    # This should never happen. Please contact the maintainers
    raise Errors::UnsupportedCaseError, event
  end
end

#on_filtering_seek(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/karafka/instrumentation/logger_listener.rb', line 211

def on_filtering_seek(event)
  consumer = event[:caller]
  topic = consumer.topic.name
  # Message to which we seek
  message = event[:message]
  partition = message.partition
  offset = message.offset

  info <<~MSG.tr("\n", ' ').strip!
    [#{consumer.id}] Post-filtering seeking to message #{offset}
    on #{topic}/#{partition}
  MSG
end

#on_filtering_throttled(event) ⇒ Object

Logs info about throttling event

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/karafka/instrumentation/logger_listener.rb', line 195

def on_filtering_throttled(event)
  consumer = event[:caller]
  topic = consumer.topic.name
  # Here we get last message before throttle
  message = event[:message]
  partition = message.partition
  offset = message.offset

  info <<~MSG.tr("\n", ' ').strip!
    [#{consumer.id}] Throttled and will resume
    from message #{offset}
    on #{topic}/#{partition}
  MSG
end

#on_process_notice_signal(event) ⇒ Object

Logs info about system signals that Karafka received and prints backtrace for threads in case of ttin

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/karafka/instrumentation/logger_listener.rb', line 118

def on_process_notice_signal(event)
  info "Received #{event[:signal]} system signal"

  # We print backtrace only for ttin
  return unless event[:signal] == :SIGTTIN

  # Inspired by Sidekiq
  Thread.list.each do |thread|
    tid = (thread.object_id ^ ::Process.pid).to_s(36)

    warn "Thread TID-#{tid} #{thread['label']}"

    if thread.backtrace
      warn thread.backtrace.join("\n")
    else
      warn '<no backtrace available>'
    end
  end
end

#on_worker_process(event) ⇒ Object

Prints info about the fact that a given job has started

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



46
47
48
49
50
51
52
53
# File 'lib/karafka/instrumentation/logger_listener.rb', line 46

def on_worker_process(event)
  job = event[:job]
  job_type = job.class.to_s.split('::').last
  consumer = job.executor.topic.consumer
  topic = job.executor.topic.name
  partition = job.executor.partition
  info "[#{job.id}] #{job_type} job for #{consumer} on #{topic}/#{partition} started"
end

#on_worker_processed(event) ⇒ Object

Prints info about the fact that a given job has finished

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/karafka/instrumentation/logger_listener.rb', line 58

def on_worker_processed(event)
  job = event[:job]
  time = event[:time]
  job_type = job.class.to_s.split('::').last
  consumer = job.executor.topic.consumer
  topic = job.executor.topic.name
  partition = job.executor.partition
  info <<~MSG.tr("\n", ' ').strip!
    [#{job.id}] #{job_type} job for #{consumer}
    on #{topic}/#{partition} finished in #{time}ms
  MSG
end