Class: Deimos::Utils::SeekListener
- Inherits:
-
Phobos::Listener
- Object
- Phobos::Listener
- Deimos::Utils::SeekListener
- Defined in:
- lib/deimos/utils/inline_consumer.rb
Overview
Listener that can seek to get the last X messages in a topic.
Instance Attribute Summary collapse
-
#num_messages ⇒ Object
Returns the value of attribute num_messages.
Instance Method Summary collapse
-
#start_listener ⇒ Object
:nodoc:.
Instance Attribute Details
#num_messages ⇒ Object
Returns the value of attribute num_messages.
9 10 11 |
# File 'lib/deimos/utils/inline_consumer.rb', line 9 def end |
Instance Method Details
#start_listener ⇒ Object
:nodoc:
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/deimos/utils/inline_consumer.rb', line 12 def start_listener ||= 10 @consumer = create_kafka_consumer @consumer.subscribe(topic, @subscribe_opts) begin last_offset = @kafka_client.last_offset_for(topic, 0) offset = last_offset - if offset.positive? Deimos.config.logger.info("Seeking to #{offset}") @consumer.seek(topic, 0, offset) end rescue StandardError => e "Could not seek to offset: #{e.message}" end instrument('listener.start_handler', ) do @handler_class.start(@kafka_client) end log_info('Listener started', ) end |