Method: Poseidon::PartitionConsumer#fetch
- Defined in:
- lib/poseidon/partition_consumer.rb
#fetch(options = {}) ⇒ Object
Fetch messages from the broker.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/poseidon/partition_consumer.rb', line 98 def fetch( = {}) fetch_max_wait = .delete(:max_wait_ms) || max_wait_ms fetch_max_bytes = .delete(:max_bytes) || max_bytes fetch_min_bytes = .delete(:min_bytes) || min_bytes if .keys.any? raise ArgumentError, "Unknown options: #{options.keys.inspect}" end topic_fetches = build_topic_fetch_request(fetch_max_bytes) fetch_response = @connection.fetch(fetch_max_wait, fetch_min_bytes, topic_fetches) topic_response = fetch_response.topic_fetch_responses.first partition_response = topic_response.partition_fetch_responses.first unless partition_response.error == Errors::NO_ERROR_CODE if @offset < 0 && Errors::ERROR_CODES[partition_response.error] == Errors::OffsetOutOfRange @offset = :earliest_offset return fetch() end raise Errors::ERROR_CODES[partition_response.error] else @highwater_mark = partition_response.highwater_mark_offset = partition_response..flatten.map do |m| FetchedMessage.new(topic_response.topic, m.value, m.key, m.offset) end if .any? @offset = .last.offset + 1 end end end |