Class: Kafka::Consumer
Constant Summary collapse
- MAX_SIZE =
1 megabyte
1024 * 1024
- DEFAULT_POLLING_INTERVAL =
2 seconds
2
- MAX_OFFSETS =
1
- LATEST_OFFSET =
-1
- EARLIEST_OFFSET =
-2
Constants included from IO
Instance Attribute Summary collapse
-
#max_size ⇒ Object
Returns the value of attribute max_size.
-
#offset ⇒ Object
Returns the value of attribute offset.
-
#partition ⇒ Object
Returns the value of attribute partition.
-
#polling ⇒ Object
Returns the value of attribute polling.
-
#request_type ⇒ Object
Returns the value of attribute request_type.
-
#topic ⇒ Object
Returns the value of attribute topic.
Attributes included from IO
#compression, #host, #port, #socket
Instance Method Summary collapse
- #consume ⇒ Object
- #encode_request(request_type, topic, partition, offset, max_size) ⇒ Object
- #encoded_request_size ⇒ Object
- #fetch_latest_offset ⇒ Object
-
#initialize(options = {}) ⇒ Consumer
constructor
A new instance of Consumer.
- #loop(&block) ⇒ Object
- #read_data_response ⇒ Object
- #read_offsets_response ⇒ Object
- #send_consume_request ⇒ Object
- #send_offsets_request ⇒ Object
Methods included from IO
#connect, #disconnect, #read, #reconnect, #write
Constructor Details
#initialize(options = {}) ⇒ Consumer
Returns a new instance of Consumer.
29 30 31 32 33 34 35 36 37 38 |
# File 'lib/kafka/consumer.rb', line 29 def initialize( = {}) self.topic = [:topic] || "test" self.partition = [:partition] || 0 self.host = [:host] || HOST self.port = [:port] || PORT self.offset = [:offset] self.max_size = [:max_size] || MAX_SIZE self.polling = [:polling] || DEFAULT_POLLING_INTERVAL connect(host, port) end |
Instance Attribute Details
#max_size ⇒ Object
Returns the value of attribute max_size.
27 28 29 |
# File 'lib/kafka/consumer.rb', line 27 def max_size @max_size end |
#offset ⇒ Object
Returns the value of attribute offset.
27 28 29 |
# File 'lib/kafka/consumer.rb', line 27 def offset @offset end |
#partition ⇒ Object
Returns the value of attribute partition.
27 28 29 |
# File 'lib/kafka/consumer.rb', line 27 def partition @partition end |
#polling ⇒ Object
Returns the value of attribute polling.
27 28 29 |
# File 'lib/kafka/consumer.rb', line 27 def polling @polling end |
#request_type ⇒ Object
Returns the value of attribute request_type.
27 28 29 |
# File 'lib/kafka/consumer.rb', line 27 def request_type @request_type end |
#topic ⇒ Object
Returns the value of attribute topic.
27 28 29 |
# File 'lib/kafka/consumer.rb', line 27 def topic @topic end |
Instance Method Details
#consume ⇒ Object
49 50 51 52 53 54 55 56 57 |
# File 'lib/kafka/consumer.rb', line 49 def consume self.offset ||= fetch_latest_offset send_consume_request = Kafka::Message.parse_from(read_data_response) self.offset += .size . rescue SocketError nil end |
#encode_request(request_type, topic, partition, offset, max_size) ⇒ Object
90 91 92 93 94 95 96 97 |
# File 'lib/kafka/consumer.rb', line 90 def encode_request(request_type, topic, partition, offset, max_size) request_type = [request_type].pack("n") topic = [topic.length].pack('n') + topic partition = [partition].pack("N") offset = [offset].pack("q").reverse # DIY 64bit big endian integer max_size = [max_size].pack("N") request_type + topic + partition + offset + max_size end |
#encoded_request_size ⇒ Object
85 86 87 88 |
# File 'lib/kafka/consumer.rb', line 85 def encoded_request_size size = 2 + 2 + topic.length + 4 + 8 + 4 [size].pack("N") end |
#fetch_latest_offset ⇒ Object
59 60 61 62 |
# File 'lib/kafka/consumer.rb', line 59 def fetch_latest_offset send_offsets_request read_offsets_response end |
#loop(&block) ⇒ Object
40 41 42 43 44 45 46 47 |
# File 'lib/kafka/consumer.rb', line 40 def loop(&block) = [] while (true) do = consume block.call() if && !.empty? sleep(polling) end end |
#read_data_response ⇒ Object
78 79 80 81 82 83 |
# File 'lib/kafka/consumer.rb', line 78 def read_data_response data_length = read(4).unpack("N").shift data = read(data_length) # TODO: inspect error code instead of skipping it data[2, data.length] end |
#read_offsets_response ⇒ Object
69 70 71 |
# File 'lib/kafka/consumer.rb', line 69 def read_offsets_response read_data_response[4,8].reverse.unpack('q')[0] end |
#send_consume_request ⇒ Object
73 74 75 76 |
# File 'lib/kafka/consumer.rb', line 73 def send_consume_request write(encoded_request_size) write(encode_request(Kafka::RequestType::FETCH, topic, partition, offset, max_size)) end |
#send_offsets_request ⇒ Object
64 65 66 67 |
# File 'lib/kafka/consumer.rb', line 64 def send_offsets_request write(encoded_request_size) write(encode_request(Kafka::RequestType::OFFSETS, topic, partition, LATEST_OFFSET, MAX_OFFSETS)) end |