Module: Deimos::KafkaListener
- Defined in:
- lib/deimos/instrumentation.rb
Overview
This module listens to events published by RubyKafka.
Class Method Summary collapse
-
.send_produce_error(event) ⇒ void
Listens for any exceptions that happen during publishing and re-publishes as a Deimos event.
Class Method Details
.send_produce_error(event) ⇒ void
This method returns an undefined value.
Listens for any exceptions that happen during publishing and re-publishes as a Deimos event.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/deimos/instrumentation.rb', line 50 def self.send_produce_error(event) exception = event.payload[:exception_object] return unless exception if exception.respond_to?(:failed_messages) = exception. .group_by(&:topic).each do |topic, batch| producer = Deimos::Producer.descendants.find { |c| c.topic == topic } next if batch.empty? || !producer decoder = Deimos.schema_backend(schema: producer.config[:schema], namespace: producer.config[:namespace]) payloads = batch.map { |m| decoder.decode(m.value) } Deimos.config.metrics&.increment( 'publish_error', tags: %W(topic:#{topic}), by: payloads.size ) Deimos.instrument( 'produce_error', producer: producer, topic: topic, exception_object: exception, payloads: payloads ) end else Deimos.config.metrics&.increment( 'publish_error', by: event.payload[:message_count] || 1 ) end end |