Class: Messaging::Adapters::Kafka::Consumer

Inherits:
Object
  • Object
show all
Extended by:
Dry::Initializer
Includes:
Instrumentation, Routing
Defined in:
lib/messaging/adapters/kafka/consumer.rb

Overview

Internal: Wraps a Ruby-Kafka consumer

Subscribes to topics and dispatches messages to a handler for doing the actual work.

Constant Summary

Constants included from Instrumentation

Instrumentation::NAMESPACE

Instance Method Summary collapse

Methods included from Routing

#clear_routes!, #handle, included, #on

Methods included from Instrumentation

#instrument, subscribe, unsubscribe

Instance Method Details

#log_current_statusObject



36
37
38
39
40
# File 'lib/messaging/adapters/kafka/consumer.rb', line 36

def log_current_status
  logger.info "Current offsets for #{name}:"
  logger.info '|- no messages yet' if current_offsets.empty?
  current_offsets.each { |partition, offset| logger.info "|- #{partition}: #{offset}" }
end

#startObject



21
22
23
24
25
26
27
28
# File 'lib/messaging/adapters/kafka/consumer.rb', line 21

def start
  logger.info "Consumer #{name} started"
  subscribe_kafka_consumer_to_topics
  @running = true
  process_messages
ensure
  stop if @running
end

#stopObject



30
31
32
33
34
# File 'lib/messaging/adapters/kafka/consumer.rb', line 30

def stop
  logger.info "Consumer #{name} stopping"
  @running = false
  consumer&.stop
end