Class: Messaging::Adapters::Kafka

Inherits:
Object
  • Object
show all
Defined in:
lib/messaging/adapters/kafka.rb,
lib/messaging/adapters/kafka/consumer.rb,
lib/messaging/adapters/kafka/producer.rb

Overview

Internal: Adapter for producing and consuming messages with Kafka

Defined Under Namespace

Classes: Consumer, Producer

Instance Method Summary collapse

Instance Method Details

#clientObject

Internal: Ruby-Kafka client for the current thread.

We keep one per thread as the client itself is not meant to be shared. See github.com/zendesk/ruby-kafka#thread-safety for more information.



14
15
16
# File 'lib/messaging/adapters/kafka.rb', line 14

def client
  Thread.current[:messaging_kafka_client] ||= create_kafka_client
end

#create_consumer(name, **options) ⇒ Object



24
25
26
# File 'lib/messaging/adapters/kafka.rb', line 24

def create_consumer(name, **options)
  Consumer.new(name: name, kafka_adapter: self, **options)
end

#create_kafka_consumer(group_id) ⇒ Object

Internal: Setup a Ruby-Kafka consumer



29
30
31
# File 'lib/messaging/adapters/kafka.rb', line 29

def create_kafka_consumer(group_id)
  client.consumer(**{ group_id: group_id }.merge(Config.kafka.consumer.to_h))
end

#dispatcherObject

The producer doesn’t need to be per thread as it uses a background thread to deliver messages.



20
21
22
# File 'lib/messaging/adapters/kafka.rb', line 20

def dispatcher
  @dispatcher ||= Producer.new(self)
end