Class: Kafka::Consumer

Inherits:
Object
  • Object
show all
Includes:
IO
Defined in:
lib/kafka/consumer.rb

Constant Summary collapse

MAX_SIZE =

1 megabyte

1024 * 1024
DEFAULT_POLLING_INTERVAL =

2 seconds

2
MAX_OFFSETS =
1
LATEST_OFFSET =
-1
EARLIEST_OFFSET =
-2

Constants included from IO

IO::HOST, IO::PORT

Instance Attribute Summary collapse

Attributes included from IO

#compression, #host, #port, #socket

Instance Method Summary collapse

Methods included from IO

#connect, #disconnect, #read, #reconnect, #write

Constructor Details

#initialize(options = {}) ⇒ Consumer

Returns a new instance of Consumer.



29
30
31
32
33
34
35
36
37
38
# File 'lib/kafka/consumer.rb', line 29

def initialize(options = {})
  self.topic        = options[:topic]     || "test"
  self.partition    = options[:partition] || 0
  self.host         = options[:host]      || HOST
  self.port         = options[:port]      || PORT
  self.offset       = options[:offset]
  self.max_size     = options[:max_size]  || MAX_SIZE
  self.polling      = options[:polling]   || DEFAULT_POLLING_INTERVAL
  connect(host, port)
end

Instance Attribute Details

#max_sizeObject

Returns the value of attribute max_size.



27
28
29
# File 'lib/kafka/consumer.rb', line 27

def max_size
  @max_size
end

#offsetObject

Returns the value of attribute offset.



27
28
29
# File 'lib/kafka/consumer.rb', line 27

def offset
  @offset
end

#partitionObject

Returns the value of attribute partition.



27
28
29
# File 'lib/kafka/consumer.rb', line 27

def partition
  @partition
end

#pollingObject

Returns the value of attribute polling.



27
28
29
# File 'lib/kafka/consumer.rb', line 27

def polling
  @polling
end

#request_typeObject

Returns the value of attribute request_type.



27
28
29
# File 'lib/kafka/consumer.rb', line 27

def request_type
  @request_type
end

#topicObject

Returns the value of attribute topic.



27
28
29
# File 'lib/kafka/consumer.rb', line 27

def topic
  @topic
end

Instance Method Details

#consumeObject



49
50
51
52
53
54
55
56
57
# File 'lib/kafka/consumer.rb', line 49

def consume
  self.offset ||= fetch_latest_offset
  send_consume_request
  message_set = Kafka::Message.parse_from(read_data_response)
  self.offset += message_set.size
  message_set.messages
rescue SocketError
  nil
end

#encode_request(request_type, topic, partition, offset, max_size) ⇒ Object



90
91
92
93
94
95
96
97
# File 'lib/kafka/consumer.rb', line 90

def encode_request(request_type, topic, partition, offset, max_size)
  request_type = [request_type].pack("n")
  topic        = [topic.length].pack('n') + topic
  partition    = [partition].pack("N")
  offset       = [offset].pack("q").reverse # DIY 64bit big endian integer
  max_size     = [max_size].pack("N")
  request_type + topic + partition + offset + max_size
end

#encoded_request_sizeObject



85
86
87
88
# File 'lib/kafka/consumer.rb', line 85

def encoded_request_size
  size = 2 + 2 + topic.length + 4 + 8 + 4
  [size].pack("N")
end

#fetch_latest_offsetObject



59
60
61
62
# File 'lib/kafka/consumer.rb', line 59

def fetch_latest_offset
  send_offsets_request
  read_offsets_response
end

#loop(&block) ⇒ Object



40
41
42
43
44
45
46
47
# File 'lib/kafka/consumer.rb', line 40

def loop(&block)
  messages = []
  while (true) do
    messages = consume
    block.call(messages) if messages && !messages.empty?
    sleep(polling)
  end
end

#read_data_responseObject



78
79
80
81
82
83
# File 'lib/kafka/consumer.rb', line 78

def read_data_response
  data_length = read(4).unpack("N").shift
  data = read(data_length)
  # TODO: inspect error code instead of skipping it
  data[2, data.length]
end

#read_offsets_responseObject



69
70
71
# File 'lib/kafka/consumer.rb', line 69

def read_offsets_response
  read_data_response[4,8].reverse.unpack('q')[0]
end

#send_consume_requestObject



73
74
75
76
# File 'lib/kafka/consumer.rb', line 73

def send_consume_request
  write(encoded_request_size)
  write(encode_request(Kafka::RequestType::FETCH, topic, partition, offset, max_size))
end

#send_offsets_requestObject



64
65
66
67
# File 'lib/kafka/consumer.rb', line 64

def send_offsets_request
  write(encoded_request_size)
  write(encode_request(Kafka::RequestType::OFFSETS, topic, partition, LATEST_OFFSET, MAX_OFFSETS))
end