Class: Poseidon::PartitionConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/poseidon/partition_consumer.rb

Overview

A primitive Kafka Consumer which operates on a specific broker, topic and partition.

Example in the README.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client_id, host, port, topic, partition, offset, options = {}) ⇒ PartitionConsumer

Create a new consumer which reads the specified topic and partition from the host.

Parameters:

  • client_id (String)

    Used to identify this client should be unique.

  • host (String)
  • port (Integer)
  • topic (String)

    Topic to read from

  • partition (Integer)

    Partitions are zero indexed.

  • offset (Integer, Symbol)

    Offset to start reading from. A negative offset can also be passed. There are a couple special offsets which can be passed as symbols: :earliest_offset Start reading from the first offset the server has. :latest_offset Start reading from the latest offset the server has.

  • options (Hash) (defaults to: {})

    Theses options can all be overridden in each individual fetch command.

Options Hash (options):

  • :max_bytes (Integer)

    Maximum number of bytes to fetch Default: 1048576 (1MB)

  • :max_wait_ms (Integer)

    How long to block until the server sends us data. NOTE: This is only enforced if min_bytes is > 0. Default: 100 (100ms)

  • :min_bytes (Integer)

    Smallest amount of data the server should send us. Default: 1 (Send us data as soon as it is ready)

  • :socket_timeout_ms (Integer)

    How long to wait for reply from server. Should be higher than max_wait_ms. Default: 10000 (10s)


71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/poseidon/partition_consumer.rb', line 71

def initialize(client_id, host, port, topic, partition, offset, options = {})
  @host = host
  @port = port

  handle_options(options)

  @connection = Connection.new(host, port, client_id, @socket_timeout_ms)
  @topic = topic
  @partition = partition
  if Symbol === offset
    raise ArgumentError, "Unknown special offset type: #{offset}" unless [:earliest_offset, :latest_offset].include?(offset)
  end
  @offset = offset
end

Instance Attribute Details

#highwater_markObject (readonly)

The offset of the latest message the broker recieved for this partition. Useful for knowning how far behind the consumer is. This value is only as recent as the last fetch call.


11
12
13
# File 'lib/poseidon/partition_consumer.rb', line 11

def highwater_mark
  @highwater_mark
end

#hostObject (readonly)


13
14
15
# File 'lib/poseidon/partition_consumer.rb', line 13

def host
  @host
end

#offsetObject (readonly)


15
16
17
# File 'lib/poseidon/partition_consumer.rb', line 15

def offset
  @offset
end

#portObject (readonly)


13
14
15
# File 'lib/poseidon/partition_consumer.rb', line 13

def port
  @port
end

#topicObject (readonly)


17
18
19
# File 'lib/poseidon/partition_consumer.rb', line 17

def topic
  @topic
end

Class Method Details

.consumer_for_partition(client_id, seed_brokers, topic, partition, offset, options = {}) ⇒ Object

Returns a consumer pointing at the lead broker for the partition.

Eventually this will be replaced by higher level consumer functionality, this is a stop-gap.


24
25
26
27
28
29
30
31
32
33
34
# File 'lib/poseidon/partition_consumer.rb', line 24

def self.consumer_for_partition(client_id, seed_brokers, topic, partition, offset, options = {})

  broker = BrokerPool.open(client_id, seed_brokers, options[:socket_timeout_ms] || 10_000) do |broker_pool|
     = ClusterMetadata.new
    .update(broker_pool.([topic]))

    .lead_broker_for_partition(topic, partition)
  end

  new(client_id, broker.host, broker.port, topic, partition, offset, options)
end

Instance Method Details

#closeNil

Close the connection to the kafka broker

Returns:

  • (Nil)

147
148
149
150
# File 'lib/poseidon/partition_consumer.rb', line 147

def close
  @connection.close
  nil
end

#fetch(options = {}) ⇒ Object

Fetch messages from the broker.

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :max_bytes (Integer)

    Maximum number of bytes to fetch

  • :max_wait_ms (Integer)

    How long to block until the server sends us data.

  • :min_bytes (Integer)

    Smallest amount of data the server should send us.


100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/poseidon/partition_consumer.rb', line 100

def fetch(options = {})
  fetch_max_wait = options.delete(:max_wait_ms) || max_wait_ms
  fetch_max_bytes = options.delete(:max_bytes) || max_bytes
  fetch_min_bytes = options.delete(:min_bytes) || min_bytes

  if options.keys.any?
    raise ArgumentError, "Unknown options: #{options.keys.inspect}"
  end

  topic_fetches = build_topic_fetch_request(fetch_max_bytes)
  fetch_response = @connection.fetch(fetch_max_wait, fetch_min_bytes, topic_fetches)
  topic_response = fetch_response.topic_fetch_responses.first 
  partition_response = topic_response.partition_fetch_responses.first

  unless partition_response.error == Errors::NO_ERROR_CODE
    if @offset < 0 &&
      Errors::ERROR_CODES[partition_response.error] == Errors::OffsetOutOfRange
      @offset = :earliest_offset
      return fetch(options)
    end

    raise Errors::ERROR_CODES[partition_response.error]
  else
    @highwater_mark = partition_response.highwater_mark_offset
    messages = partition_response.message_set.flatten.map do |m|
      FetchedMessage.new(topic_response.topic, m.value, m.key, m.offset)
    end
    if messages.any?
      @offset = messages.last.offset + 1
    end
    messages
  end
end

#next_offsetInteger

Returns next offset we will fetch

Returns:

  • (Integer)

    next offset we will fetch


137
138
139
140
# File 'lib/poseidon/partition_consumer.rb', line 137

def next_offset
  resolve_offset_if_necessary
  @offset
end