Class: Messaging::Adapters::Kafka
- Inherits:
-
Object
- Object
- Messaging::Adapters::Kafka
- Defined in:
- lib/messaging/adapters/kafka.rb,
lib/messaging/adapters/kafka/consumer.rb,
lib/messaging/adapters/kafka/producer.rb
Overview
Internal: Adapter for producing and consuming messages with Kafka
Defined Under Namespace
Instance Method Summary collapse
-
#client ⇒ Object
Internal: Ruby-Kafka client for the current thread.
- #create_consumer(name, **options) ⇒ Object
-
#create_kafka_consumer(group_id) ⇒ Object
Internal: Setup a Ruby-Kafka consumer.
-
#dispatcher ⇒ Object
The producer doesn’t need to be per thread as it uses a background thread to deliver messages.
Instance Method Details
#client ⇒ Object
Internal: Ruby-Kafka client for the current thread.
We keep one per thread as the client itself is not meant to be shared. See github.com/zendesk/ruby-kafka#thread-safety for more information.
14 15 16 |
# File 'lib/messaging/adapters/kafka.rb', line 14 def client Thread.current[:messaging_kafka_client] ||= create_kafka_client end |
#create_consumer(name, **options) ⇒ Object
24 25 26 |
# File 'lib/messaging/adapters/kafka.rb', line 24 def create_consumer(name, **) Consumer.new(name: name, kafka_adapter: self, **) end |
#create_kafka_consumer(group_id) ⇒ Object
Internal: Setup a Ruby-Kafka consumer
29 30 31 |
# File 'lib/messaging/adapters/kafka.rb', line 29 def create_kafka_consumer(group_id) client.consumer(**{ group_id: group_id }.merge(Config.kafka.consumer.to_h)) end |