Class: Kafka::Consumer
Constant Summary collapse
- MAX_SIZE =
1 megabyte
1024 * 1024
- DEFAULT_POLLING_INTERVAL =
2 seconds
2
- MAX_OFFSETS =
1
- LATEST_OFFSET =
-1
- EARLIEST_OFFSET =
-2
Instance Attribute Summary collapse
-
#max_size ⇒ Object
Returns the value of attribute max_size.
-
#offset ⇒ Object
Returns the value of attribute offset.
-
#partition ⇒ Object
Returns the value of attribute partition.
-
#polling ⇒ Object
Returns the value of attribute polling.
-
#request_type ⇒ Object
Returns the value of attribute request_type.
-
#topic ⇒ Object
Returns the value of attribute topic.
Attributes included from IO
Instance Method Summary collapse
- #consume ⇒ Object
- #encode_request(request_type, topic, partition, offset, max_size) ⇒ Object
- #encoded_request_size ⇒ Object
- #fetch_latest_offset ⇒ Object
-
#initialize(options = {}) ⇒ Consumer
constructor
A new instance of Consumer.
- #loop(&block) ⇒ Object
- #parse_message_set_from(data) ⇒ Object
- #read_data_response ⇒ Object
- #read_offsets_response ⇒ Object
- #send_consume_request ⇒ Object
- #send_offsets_request ⇒ Object
Methods included from IO
#connect, #disconnect, #read, #reconnect, #write
Constructor Details
#initialize(options = {}) ⇒ Consumer
Returns a new instance of Consumer.
29 30 31 32 33 34 35 36 37 38 |
# File 'lib/kafka/consumer.rb', line 29 def initialize( = {}) self.topic = [:topic] || "test" self.partition = [:partition] || 0 self.host = [:host] || "localhost" self.port = [:port] || 9092 self.offset = [:offset] self.max_size = [:max_size] || MAX_SIZE self.polling = [:polling] || DEFAULT_POLLING_INTERVAL connect(host, port) end |
Instance Attribute Details
#max_size ⇒ Object
Returns the value of attribute max_size.
27 28 29 |
# File 'lib/kafka/consumer.rb', line 27 def max_size @max_size end |
#offset ⇒ Object
Returns the value of attribute offset.
27 28 29 |
# File 'lib/kafka/consumer.rb', line 27 def offset @offset end |
#partition ⇒ Object
Returns the value of attribute partition.
27 28 29 |
# File 'lib/kafka/consumer.rb', line 27 def partition @partition end |
#polling ⇒ Object
Returns the value of attribute polling.
27 28 29 |
# File 'lib/kafka/consumer.rb', line 27 def polling @polling end |
#request_type ⇒ Object
Returns the value of attribute request_type.
27 28 29 |
# File 'lib/kafka/consumer.rb', line 27 def request_type @request_type end |
#topic ⇒ Object
Returns the value of attribute topic.
27 28 29 |
# File 'lib/kafka/consumer.rb', line 27 def topic @topic end |
Instance Method Details
#consume ⇒ Object
49 50 51 52 53 54 55 56 |
# File 'lib/kafka/consumer.rb', line 49 def consume self.offset ||= fetch_latest_offset send_consume_request data = read_data_response (data) rescue SocketError nil end |
#encode_request(request_type, topic, partition, offset, max_size) ⇒ Object
89 90 91 92 93 94 95 96 |
# File 'lib/kafka/consumer.rb', line 89 def encode_request(request_type, topic, partition, offset, max_size) request_type = [request_type].pack("n") topic = [topic.length].pack('n') + topic partition = [partition].pack("N") offset = [offset].pack("q").reverse # DIY 64bit big endian integer max_size = [max_size].pack("N") request_type + topic + partition + offset + max_size end |
#encoded_request_size ⇒ Object
84 85 86 87 |
# File 'lib/kafka/consumer.rb', line 84 def encoded_request_size size = 2 + 2 + topic.length + 4 + 8 + 4 [size].pack("N") end |
#fetch_latest_offset ⇒ Object
58 59 60 61 |
# File 'lib/kafka/consumer.rb', line 58 def fetch_latest_offset send_offsets_request read_offsets_response end |
#loop(&block) ⇒ Object
40 41 42 43 44 45 46 47 |
# File 'lib/kafka/consumer.rb', line 40 def loop(&block) = [] while (true) do = consume block.call() if && !.empty? sleep(polling) end end |
#parse_message_set_from(data) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/kafka/consumer.rb', line 98 def (data) = [] processed = 0 length = data.length - 4 while (processed <= length) do = data[processed, 4].unpack("N").shift + 4 = data[processed, ] break unless .size == << Kafka::Message.parse_from() processed += end self.offset += processed end |
#read_data_response ⇒ Object
77 78 79 80 81 82 |
# File 'lib/kafka/consumer.rb', line 77 def read_data_response data_length = read(4).unpack("N").shift data = read(data_length) # TODO: inspect error code instead of skipping it data[2, data.length] end |
#read_offsets_response ⇒ Object
68 69 70 |
# File 'lib/kafka/consumer.rb', line 68 def read_offsets_response read_data_response[4,8].reverse.unpack('q')[0] end |
#send_consume_request ⇒ Object
72 73 74 75 |
# File 'lib/kafka/consumer.rb', line 72 def send_consume_request write(encoded_request_size) write(encode_request(Kafka::RequestType::FETCH, topic, partition, offset, max_size)) end |
#send_offsets_request ⇒ Object
63 64 65 66 |
# File 'lib/kafka/consumer.rb', line 63 def send_offsets_request write(encoded_request_size) write(encode_request(Kafka::RequestType::OFFSETS, topic, partition, LATEST_OFFSET, MAX_OFFSETS)) end |