Class: Messaging::Adapters::Kafka::Consumer
- Inherits:
-
Object
- Object
- Messaging::Adapters::Kafka::Consumer
- Extended by:
- Dry::Initializer
- Includes:
- Instrumentation, Routing
- Defined in:
- lib/messaging/adapters/kafka/consumer.rb
Overview
Internal: Wraps a Ruby-Kafka consumer
Subscribes to topics and dispatches messages to a handler for doing the actual work.
Constant Summary
Constants included from Instrumentation
Instance Method Summary collapse
Methods included from Routing
#clear_routes!, #handle, included, #on
Methods included from Instrumentation
#instrument, subscribe, unsubscribe
Instance Method Details
#log_current_status ⇒ Object
36 37 38 39 40 |
# File 'lib/messaging/adapters/kafka/consumer.rb', line 36 def log_current_status logger.info "Current offsets for #{name}:" logger.info '|- no messages yet' if current_offsets.empty? current_offsets.each { |partition, offset| logger.info "|- #{partition}: #{offset}" } end |
#start ⇒ Object
21 22 23 24 25 26 27 28 |
# File 'lib/messaging/adapters/kafka/consumer.rb', line 21 def start logger.info "Consumer #{name} started" subscribe_kafka_consumer_to_topics @running = true ensure stop if @running end |
#stop ⇒ Object
30 31 32 33 34 |
# File 'lib/messaging/adapters/kafka/consumer.rb', line 30 def stop logger.info "Consumer #{name} stopping" @running = false consumer&.stop end |