Class: Poseidon::PartitionConsumer
- Inherits:
-
Object
- Object
- Poseidon::PartitionConsumer
- Defined in:
- lib/poseidon/partition_consumer.rb
Overview
A primitive Kafka Consumer which operates on a specific broker, topic and partition.
Example in the README.
Instance Attribute Summary collapse
-
#highwater_mark ⇒ Object
readonly
The offset of the latest message the broker recieved for this partition.
- #host ⇒ Object readonly
- #offset ⇒ Object readonly
- #port ⇒ Object readonly
- #topic ⇒ Object readonly
Class Method Summary collapse
-
.consumer_for_partition(client_id, seed_brokers, topic, partition, offset, options = {}) ⇒ Object
Returns a consumer pointing at the lead broker for the partition.
Instance Method Summary collapse
-
#close ⇒ Nil
Close the connection to the kafka broker.
-
#fetch(options = {}) ⇒ Object
Fetch messages from the broker.
-
#initialize(client_id, host, port, topic, partition, offset, options = {}) ⇒ PartitionConsumer
constructor
Create a new consumer which reads the specified topic and partition from the host.
-
#next_offset ⇒ Integer
Next offset we will fetch.
Constructor Details
#initialize(client_id, host, port, topic, partition, offset, options = {}) ⇒ PartitionConsumer
Create a new consumer which reads the specified topic and partition from the host.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/poseidon/partition_consumer.rb', line 69 def initialize(client_id, host, port, topic, partition, offset, = {}) @host = host @port = port () @connection = Connection.new(host, port, client_id, @socket_timeout_ms) @topic = topic @partition = partition if Symbol === offset raise ArgumentError, "Unknown special offset type: #{offset}" unless [:earliest_offset, :latest_offset].include?(offset) end @offset = offset end |
Instance Attribute Details
#highwater_mark ⇒ Object (readonly)
The offset of the latest message the broker recieved for this partition. Useful for knowning how far behind the consumer is. This value is only as recent as the last fetch call.
11 12 13 |
# File 'lib/poseidon/partition_consumer.rb', line 11 def highwater_mark @highwater_mark end |
#host ⇒ Object (readonly)
13 14 15 |
# File 'lib/poseidon/partition_consumer.rb', line 13 def host @host end |
#offset ⇒ Object (readonly)
15 16 17 |
# File 'lib/poseidon/partition_consumer.rb', line 15 def offset @offset end |
#port ⇒ Object (readonly)
13 14 15 |
# File 'lib/poseidon/partition_consumer.rb', line 13 def port @port end |
#topic ⇒ Object (readonly)
17 18 19 |
# File 'lib/poseidon/partition_consumer.rb', line 17 def topic @topic end |
Class Method Details
.consumer_for_partition(client_id, seed_brokers, topic, partition, offset, options = {}) ⇒ Object
Returns a consumer pointing at the lead broker for the partition.
Eventually this will be replaced by higher level consumer functionality, this is a stop-gap.
24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/poseidon/partition_consumer.rb', line 24 def self.consumer_for_partition(client_id, seed_brokers, topic, partition, offset, = {}) broker = BrokerPool.open(client_id, seed_brokers, [:socket_timeout_ms] || 10_000) do |broker_pool| = ClusterMetadata.new .update(broker_pool.([topic])) .lead_broker_for_partition(topic, partition) end new(client_id, broker.host, broker.port, topic, partition, offset, ) end |
Instance Method Details
#close ⇒ Nil
Close the connection to the kafka broker
145 146 147 148 |
# File 'lib/poseidon/partition_consumer.rb', line 145 def close @connection.close nil end |
#fetch(options = {}) ⇒ Object
Fetch messages from the broker.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/poseidon/partition_consumer.rb', line 98 def fetch( = {}) fetch_max_wait = .delete(:max_wait_ms) || max_wait_ms fetch_max_bytes = .delete(:max_bytes) || max_bytes fetch_min_bytes = .delete(:min_bytes) || min_bytes if .keys.any? raise ArgumentError, "Unknown options: #{.keys.inspect}" end topic_fetches = build_topic_fetch_request(fetch_max_bytes) fetch_response = @connection.fetch(fetch_max_wait, fetch_min_bytes, topic_fetches) topic_response = fetch_response.topic_fetch_responses.first partition_response = topic_response.partition_fetch_responses.first unless partition_response.error == Errors::NO_ERROR_CODE if @offset < 0 && Errors::ERROR_CODES[partition_response.error] == Errors::OffsetOutOfRange @offset = :earliest_offset return fetch() end raise Errors::ERROR_CODES[partition_response.error] else @highwater_mark = partition_response.highwater_mark_offset = partition_response..flatten.map do |m| FetchedMessage.new(topic_response.topic, m.value, m.key, m.offset) end if .any? @offset = .last.offset + 1 end end end |
#next_offset ⇒ Integer
Returns next offset we will fetch.
135 136 137 138 |
# File 'lib/poseidon/partition_consumer.rb', line 135 def next_offset resolve_offset_if_necessary @offset end |