Class: Kafka::Consumer

Inherits:
Object
  • Object
show all
Includes:
IO
Defined in:
lib/kafka/consumer.rb

Constant Summary collapse

CONSUME_REQUEST_TYPE =
Kafka::RequestType::FETCH
MAX_SIZE =

1 MB

1048576
DEFAULT_POLLING_INTERVAL =

2 seconds

2

Instance Attribute Summary collapse

Attributes included from IO

#host, #port, #socket

Instance Method Summary collapse

Methods included from IO

#connect, #disconnect, #read, #reconnect, #write

Constructor Details

#initialize(options = {}) ⇒ Consumer

Returns a new instance of Consumer.



12
13
14
15
16
17
18
19
20
21
22
# File 'lib/kafka/consumer.rb', line 12

def initialize(options = {})
  self.topic        = options[:topic]        || "test"
  self.partition    = options[:partition]    || 0
  self.host         = options[:host]         || "localhost"
  self.port         = options[:port]         || 9092
  self.offset       = options[:offset]       || 0
  self.max_size     = options[:max_size]     || MAX_SIZE
  self.request_type = options[:request_type] || CONSUME_REQUEST_TYPE
  self.polling      = options[:polling]      || DEFAULT_POLLING_INTERVAL
  self.connect(self.host, self.port)
end

Instance Attribute Details

#max_sizeObject

Returns the value of attribute max_size.



10
11
12
# File 'lib/kafka/consumer.rb', line 10

def max_size
  @max_size
end

#offsetObject

Returns the value of attribute offset.



10
11
12
# File 'lib/kafka/consumer.rb', line 10

def offset
  @offset
end

#partitionObject

Returns the value of attribute partition.



10
11
12
# File 'lib/kafka/consumer.rb', line 10

def partition
  @partition
end

#pollingObject

Returns the value of attribute polling.



10
11
12
# File 'lib/kafka/consumer.rb', line 10

def polling
  @polling
end

#request_typeObject

Returns the value of attribute request_type.



10
11
12
# File 'lib/kafka/consumer.rb', line 10

def request_type
  @request_type
end

#topicObject

Returns the value of attribute topic.



10
11
12
# File 'lib/kafka/consumer.rb', line 10

def topic
  @topic
end

Instance Method Details

#consumeObject



43
44
45
46
47
# File 'lib/kafka/consumer.rb', line 43

def consume
  self.send_consume_request         # request data
  data = self.read_data_response    # read data response
  self.parse_message_set_from(data) # parse message set
end

#encode_request(request_type, topic, partition, offset, max_size) ⇒ Object



33
34
35
36
37
38
39
40
41
# File 'lib/kafka/consumer.rb', line 33

def encode_request(request_type, topic, partition, offset, max_size)
  request_type = [request_type].pack("n")
  topic        = [topic.length].pack('n') + topic
  partition    = [partition].pack("N")
  offset       = [offset].pack("Q").reverse # DIY 64bit big endian integer
  max_size     = [max_size].pack("N")

  request_type + topic + partition + offset + max_size
end

#encode_request_sizeObject



29
30
31
# File 'lib/kafka/consumer.rb', line 29

def encode_request_size
  [self.request_size].pack("N")
end

#loop(&block) ⇒ Object



49
50
51
52
53
54
55
56
# File 'lib/kafka/consumer.rb', line 49

def loop(&block)
  messages = []
  while(true) do
    messages = self.consume
    block.call(messages) if messages && !messages.empty?
    sleep(self.polling)
  end
end

#parse_message_set_from(data) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/kafka/consumer.rb', line 69

def parse_message_set_from(data)
  messages = []
  processed = 0
  length = data.length - 4
  while(processed <= length) do
    message_size = data[processed, 4].unpack("N").shift
    messages << Kafka::Message.parse_from(data[processed, message_size + 4])
    processed += 4 + message_size
  end
  self.offset += processed
  messages
end

#read_data_responseObject



58
59
60
61
62
# File 'lib/kafka/consumer.rb', line 58

def read_data_response
  data_length = self.socket.read(4).unpack("N").shift # read length
  data = self.socket.read(data_length)                # read message set
  data[2, data.length]                                # we start with a 2 byte offset
end

#request_sizeObject

REQUEST TYPE ID + TOPIC LENGTH + TOPIC + PARTITION + OFFSET + MAX SIZE



25
26
27
# File 'lib/kafka/consumer.rb', line 25

def request_size
  2 + 2 + topic.length + 4 + 8 + 4
end

#send_consume_requestObject



64
65
66
67
# File 'lib/kafka/consumer.rb', line 64

def send_consume_request
  self.write(self.encode_request_size) # write request_size
  self.write(self.encode_request(self.request_type, self.topic, self.partition, self.offset, self.max_size)) # write request
end