Class: ROM::Kafka::Connection::Consumer Private
- Inherits:
-
ROM::Kafka::Connection
- Object
- ROM::Kafka::Connection
- ROM::Kafka::Connection::Consumer
- Includes:
- Enumerable
- Defined in:
- lib/rom/kafka/connection/consumer.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
The consumer-specific connection to Kafka cluster
It is wrapped around ‘Poseidon::Consumer` driver, and responsible for adopting poseidon API to ROM::Dataset via [#initializer] and [#each] methods.
ROM::Kafka consumer deals with tuples, hiding poseidon-specific implementation of fetched messages from the rest of the gem.
Constant Summary collapse
- DRIVER =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
The ‘poseidon’-specific class describing consumers
Poseidon::PartitionConsumer
Instance Attribute Summary collapse
-
#connection ⇒ ROM::Kafka::Connection::Consumer::DRIVER
readonly
private
Driver to Kafka.
Instance Method Summary collapse
-
#each {|tuple| ... } ⇒ Enumerator<Array<Hash{Symbol => String, Integer}>>
private
Iterates through Kafka messages.
-
#fetch ⇒ Array<Hash{Symbol => String, Integer}>
private
Fetches a single portion of messages and converts them to tuple.
-
#initialize(opts) ⇒ Consumer
constructor
private
Initializes a consumer connection.
Constructor Details
#initialize(opts) ⇒ Consumer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Initializes a consumer connection
The initializer is attributes-agnostic. This means it doesn’t validate attributes, but skips unused.
@todo: refactor usinng factory method Connection.build_consumer
59 60 61 62 63 64 |
# File 'lib/rom/kafka/connection/consumer.rb', line 59 def initialize(opts) super # takes declared attributes from options args = opts.values_at(:client_id, :brokers, :topic, :partition, :offset) @connection = DRIVER.consumer_for_partition(*args, attributes) @mutex = Mutex.new end |
Instance Attribute Details
#connection ⇒ ROM::Kafka::Connection::Consumer::DRIVER (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns driver to Kafka.
32 33 34 |
# File 'lib/rom/kafka/connection/consumer.rb', line 32 def connection @connection end |
Instance Method Details
#each {|tuple| ... } ⇒ Enumerator<Array<Hash{Symbol => String, Integer}>>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Iterates through Kafka messages
Fetches the next portion of messages until no messages given
82 83 84 85 86 87 88 89 |
# File 'lib/rom/kafka/connection/consumer.rb', line 82 def each return to_enum unless block_given? loop do tuples = fetch break unless tuples.any? tuples.each { |tuple| yield(tuple) } end end |
#fetch ⇒ Array<Hash{Symbol => String, Integer}>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Fetches a single portion of messages and converts them to tuple
70 71 72 |
# File 'lib/rom/kafka/connection/consumer.rb', line 70 def fetch @mutex.synchronize { @connection.fetch }.map(&method(:tuple)) end |