Module: Emque::Consuming::Consumer::Common
- Defined in:
- lib/emque/consuming/consumer/common.rb
Class Method Summary collapse
Instance Method Summary collapse
- #consume(handler_method, message) ⇒ Object
- #handle_error(e, method:, subject:) ⇒ Object
- #pipe_config ⇒ Object
Class Method Details
.included(descendant) ⇒ Object
16 17 18 19 20 21 |
# File 'lib/emque/consuming/consumer/common.rb', line 16 def self.included(descendant) descendant.class_eval do attr_reader :message private :handle_error, :pipe end end |
Instance Method Details
#consume(handler_method, message) ⇒ Object
23 24 25 |
# File 'lib/emque/consuming/consumer/common.rb', line 23 def consume(handler_method, ) send(handler_method, ) end |
#handle_error(e, method:, subject:) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/emque/consuming/consumer/common.rb', line 35 def handle_error(e, method:, subject:) context = { :consumer => self.class.name, :message => { :current => subject.values, :original => subject.original }, :offset => subject.offset, :partition => subject.partition, :pipe_method => method, :topic => subject.topic } # log the error by default Emque::Consuming.logger.error("Error consuming message #{e}") Emque::Consuming.logger.error(context) Emque::Consuming.logger.error e.backtrace.join("\n") unless e.backtrace.nil? Emque::Consuming.config.error_handlers.each do |handler| handler.call(e, context) end Emque::Consuming.application.instance.notice_error(context) end |
#pipe_config ⇒ Object
27 28 29 30 31 32 33 |
# File 'lib/emque/consuming/consumer/common.rb', line 27 def pipe_config @pipe_config ||= Pipe::Config.new( :error_handlers => [method(:handle_error)], :raise_on_error => true, :stop_on => ->(msg, _, _) { !(msg.respond_to?(:continue?) && msg.continue?) } ) end |