Module: Emque::Consuming::Consumer::Common

Defined in:
lib/emque/consuming/consumer/common.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(descendant) ⇒ Object



16
17
18
19
20
21
# File 'lib/emque/consuming/consumer/common.rb', line 16

def self.included(descendant)
  descendant.class_eval do
    attr_reader :message
    private :handle_error, :pipe
  end
end

Instance Method Details

#consume(handler_method, message) ⇒ Object



23
24
25
# File 'lib/emque/consuming/consumer/common.rb', line 23

def consume(handler_method, message)
  send(handler_method, message)
end

#handle_error(e, method:, subject:) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/emque/consuming/consumer/common.rb', line 35

def handle_error(e, method:, subject:)
  context = {
    :consumer => self.class.name,
    :message => {
      :current => subject.values,
      :original => subject.original
    },
    :offset => subject.offset,
    :partition => subject.partition,
    :pipe_method => method,
    :topic => subject.topic
  }

  # log the error by default
  Emque::Consuming.logger.error("Error consuming message #{e}")
  Emque::Consuming.logger.error(context)
  Emque::Consuming.logger.error e.backtrace.join("\n") unless e.backtrace.nil?

  Emque::Consuming.config.error_handlers.each do |handler|
    handler.call(e, context)
  end

  Emque::Consuming.application.instance.notice_error(context)
end

#pipe_configObject



27
28
29
30
31
32
33
# File 'lib/emque/consuming/consumer/common.rb', line 27

def pipe_config
  @pipe_config ||= Pipe::Config.new(
    :error_handlers => [method(:handle_error)],
    :raise_on_error => true,
    :stop_on => ->(msg, _, _) { !(msg.respond_to?(:continue?) && msg.continue?) }
  )
end