Method: Poseidon::PartitionConsumer#fetch

Defined in:
lib/poseidon/partition_consumer.rb

#fetch(options = {}) ⇒ Object

Fetch messages from the broker.

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • Maximum (:max_bytes)

    number of bytes to fetch

  • How (:max_wait_ms)

    long to block until the server sends us data.

  • Smallest (:min_bytes)

    amount of data the server should send us.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/poseidon/partition_consumer.rb', line 98

def fetch(options = {})
  fetch_max_wait = options.delete(:max_wait_ms) || max_wait_ms
  fetch_max_bytes = options.delete(:max_bytes) || max_bytes
  fetch_min_bytes = options.delete(:min_bytes) || min_bytes

  if options.keys.any?
    raise ArgumentError, "Unknown options: #{options.keys.inspect}"
  end

  topic_fetches = build_topic_fetch_request(fetch_max_bytes)
  fetch_response = @connection.fetch(fetch_max_wait, fetch_min_bytes, topic_fetches)
  topic_response = fetch_response.topic_fetch_responses.first 
  partition_response = topic_response.partition_fetch_responses.first

  unless partition_response.error == Errors::NO_ERROR_CODE
    if @offset < 0 &&
      Errors::ERROR_CODES[partition_response.error] == Errors::OffsetOutOfRange
      @offset = :earliest_offset
      return fetch(options)
    end

    raise Errors::ERROR_CODES[partition_response.error]
  else
    @highwater_mark = partition_response.highwater_mark_offset
    messages = partition_response.message_set.flatten.map do |m|
      FetchedMessage.new(topic_response.topic, m.value, m.key, m.offset)
    end
    if messages.any?
      @offset = messages.last.offset + 1
    end
    messages
  end
end