Module: Deimos::KafkaListener
- Defined in:
- lib/deimos/instrumentation.rb
Overview
This module listens to events published by RubyKafka.
Class Method Summary collapse
- .handle_exception_with_messages(exception) ⇒ Object
-
.send_produce_error(event) ⇒ void
Listens for any exceptions that happen during publishing and re-publishes as a Deimos event.
Class Method Details
.handle_exception_with_messages(exception) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/deimos/instrumentation.rb', line 47 def self.(exception) = exception. .group_by(&:topic).each do |topic, batch| producer = Deimos::Producer.descendants.find { |c| c.topic == topic } next if batch.empty? || !producer decoder = Deimos.schema_backend(schema: producer.config[:schema], namespace: producer.config[:namespace]) payloads = batch.map { |m| decoder.decode(m.value) } Deimos.config.metrics&.increment( 'publish_error', tags: %W(topic:#{topic}), by: payloads.size ) Deimos.instrument( 'produce_error', producer: producer, topic: topic, exception_object: exception, payloads: payloads ) end end |
.send_produce_error(event) ⇒ void
This method returns an undefined value.
Listens for any exceptions that happen during publishing and re-publishes as a Deimos event.
76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/deimos/instrumentation.rb', line 76 def self.send_produce_error(event) exception = event.payload[:exception_object] return unless exception if exception.respond_to?(:failed_messages) (exception) else Deimos.config.metrics&.increment( 'publish_error', by: event.payload[:message_count] || 1 ) end end |