Class: Kafka::Consumer
Constant Summary collapse
- CONSUME_REQUEST_TYPE =
Kafka::RequestType::FETCH
- MAX_SIZE =
1 MB
1048576
- DEFAULT_POLLING_INTERVAL =
2 seconds
2
Instance Attribute Summary collapse
-
#max_size ⇒ Object
Returns the value of attribute max_size.
-
#offset ⇒ Object
Returns the value of attribute offset.
-
#partition ⇒ Object
Returns the value of attribute partition.
-
#polling ⇒ Object
Returns the value of attribute polling.
-
#request_type ⇒ Object
Returns the value of attribute request_type.
-
#topic ⇒ Object
Returns the value of attribute topic.
Attributes included from IO
Instance Method Summary collapse
- #consume ⇒ Object
- #encode_request(request_type, topic, partition, offset, max_size) ⇒ Object
- #encode_request_size ⇒ Object
-
#initialize(options = {}) ⇒ Consumer
constructor
A new instance of Consumer.
- #loop(&block) ⇒ Object
- #parse_message_set_from(data) ⇒ Object
- #read_data_response ⇒ Object
-
#request_size ⇒ Object
REQUEST TYPE ID + TOPIC LENGTH + TOPIC + PARTITION + OFFSET + MAX SIZE.
- #send_consume_request ⇒ Object
Methods included from IO
#connect, #disconnect, #read, #reconnect, #write
Constructor Details
#initialize(options = {}) ⇒ Consumer
Returns a new instance of Consumer.
12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/kafka/consumer.rb', line 12 def initialize( = {}) self.topic = [:topic] || "test" self.partition = [:partition] || 0 self.host = [:host] || "localhost" self.port = [:port] || 9092 self.offset = [:offset] || 0 self.max_size = [:max_size] || MAX_SIZE self.request_type = [:request_type] || CONSUME_REQUEST_TYPE self.polling = [:polling] || DEFAULT_POLLING_INTERVAL self.connect(self.host, self.port) end |
Instance Attribute Details
#max_size ⇒ Object
Returns the value of attribute max_size.
10 11 12 |
# File 'lib/kafka/consumer.rb', line 10 def max_size @max_size end |
#offset ⇒ Object
Returns the value of attribute offset.
10 11 12 |
# File 'lib/kafka/consumer.rb', line 10 def offset @offset end |
#partition ⇒ Object
Returns the value of attribute partition.
10 11 12 |
# File 'lib/kafka/consumer.rb', line 10 def partition @partition end |
#polling ⇒ Object
Returns the value of attribute polling.
10 11 12 |
# File 'lib/kafka/consumer.rb', line 10 def polling @polling end |
#request_type ⇒ Object
Returns the value of attribute request_type.
10 11 12 |
# File 'lib/kafka/consumer.rb', line 10 def request_type @request_type end |
#topic ⇒ Object
Returns the value of attribute topic.
10 11 12 |
# File 'lib/kafka/consumer.rb', line 10 def topic @topic end |
Instance Method Details
#consume ⇒ Object
43 44 45 46 47 |
# File 'lib/kafka/consumer.rb', line 43 def consume self.send_consume_request # request data data = self.read_data_response # read data response self.(data) # parse message set end |
#encode_request(request_type, topic, partition, offset, max_size) ⇒ Object
33 34 35 36 37 38 39 40 41 |
# File 'lib/kafka/consumer.rb', line 33 def encode_request(request_type, topic, partition, offset, max_size) request_type = [request_type].pack("n") topic = [topic.length].pack('n') + topic partition = [partition].pack("N") offset = [offset].pack("Q").reverse # DIY 64bit big endian integer max_size = [max_size].pack("N") request_type + topic + partition + offset + max_size end |
#encode_request_size ⇒ Object
29 30 31 |
# File 'lib/kafka/consumer.rb', line 29 def encode_request_size [self.request_size].pack("N") end |
#loop(&block) ⇒ Object
49 50 51 52 53 54 55 56 |
# File 'lib/kafka/consumer.rb', line 49 def loop(&block) = [] while(true) do = self.consume block.call() if && !.empty? sleep(self.polling) end end |
#parse_message_set_from(data) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/kafka/consumer.rb', line 69 def (data) = [] processed = 0 length = data.length - 4 while(processed <= length) do = data[processed, 4].unpack("N").shift << Kafka::Message.parse_from(data[processed, + 4]) processed += 4 + end self.offset += processed end |
#read_data_response ⇒ Object
58 59 60 61 62 |
# File 'lib/kafka/consumer.rb', line 58 def read_data_response data_length = self.socket.read(4).unpack("N").shift # read length data = self.socket.read(data_length) # read message set data[2, data.length] # we start with a 2 byte offset end |
#request_size ⇒ Object
REQUEST TYPE ID + TOPIC LENGTH + TOPIC + PARTITION + OFFSET + MAX SIZE
25 26 27 |
# File 'lib/kafka/consumer.rb', line 25 def request_size 2 + 2 + topic.length + 4 + 8 + 4 end |
#send_consume_request ⇒ Object
64 65 66 67 |
# File 'lib/kafka/consumer.rb', line 64 def send_consume_request self.write(self.encode_request_size) # write request_size self.write(self.encode_request(self.request_type, self.topic, self.partition, self.offset, self.max_size)) # write request end |