Class: Kafka::Consumer

Inherits:
Object
  • Object
show all
Includes:
IO
Defined in:
lib/kafka/consumer.rb

Constant Summary collapse

MAX_SIZE =

1 megabyte

1024 * 1024
DEFAULT_POLLING_INTERVAL =

2 seconds

2
MAX_OFFSETS =
1
LATEST_OFFSET =
-1
EARLIEST_OFFSET =
-2

Instance Attribute Summary collapse

Attributes included from IO

#host, #port, #socket

Instance Method Summary collapse

Methods included from IO

#connect, #disconnect, #read, #reconnect, #write

Constructor Details

#initialize(options = {}) ⇒ Consumer

Returns a new instance of Consumer.



29
30
31
32
33
34
35
36
37
38
# File 'lib/kafka/consumer.rb', line 29

def initialize(options = {})
  self.topic        = options[:topic]     || "test"
  self.partition    = options[:partition] || 0
  self.host         = options[:host]      || "localhost"
  self.port         = options[:port]      || 9092
  self.offset       = options[:offset]
  self.max_size     = options[:max_size]  || MAX_SIZE
  self.polling      = options[:polling]   || DEFAULT_POLLING_INTERVAL
  connect(host, port)
end

Instance Attribute Details

#max_sizeObject

Returns the value of attribute max_size.



27
28
29
# File 'lib/kafka/consumer.rb', line 27

def max_size
  @max_size
end

#offsetObject

Returns the value of attribute offset.



27
28
29
# File 'lib/kafka/consumer.rb', line 27

def offset
  @offset
end

#partitionObject

Returns the value of attribute partition.



27
28
29
# File 'lib/kafka/consumer.rb', line 27

def partition
  @partition
end

#pollingObject

Returns the value of attribute polling.



27
28
29
# File 'lib/kafka/consumer.rb', line 27

def polling
  @polling
end

#request_typeObject

Returns the value of attribute request_type.



27
28
29
# File 'lib/kafka/consumer.rb', line 27

def request_type
  @request_type
end

#topicObject

Returns the value of attribute topic.



27
28
29
# File 'lib/kafka/consumer.rb', line 27

def topic
  @topic
end

Instance Method Details

#consumeObject



49
50
51
52
53
54
55
56
# File 'lib/kafka/consumer.rb', line 49

def consume
  self.offset ||= fetch_latest_offset
  send_consume_request
  data = read_data_response
  parse_message_set_from(data)
rescue SocketError
  nil
end

#encode_request(request_type, topic, partition, offset, max_size) ⇒ Object



89
90
91
92
93
94
95
96
# File 'lib/kafka/consumer.rb', line 89

def encode_request(request_type, topic, partition, offset, max_size)
  request_type = [request_type].pack("n")
  topic        = [topic.length].pack('n') + topic
  partition    = [partition].pack("N")
  offset       = [offset].pack("q").reverse # DIY 64bit big endian integer
  max_size     = [max_size].pack("N")
  request_type + topic + partition + offset + max_size
end

#encoded_request_sizeObject



84
85
86
87
# File 'lib/kafka/consumer.rb', line 84

def encoded_request_size
  size = 2 + 2 + topic.length + 4 + 8 + 4
  [size].pack("N")
end

#fetch_latest_offsetObject



58
59
60
61
# File 'lib/kafka/consumer.rb', line 58

def fetch_latest_offset
  send_offsets_request
  read_offsets_response
end

#loop(&block) ⇒ Object



40
41
42
43
44
45
46
47
# File 'lib/kafka/consumer.rb', line 40

def loop(&block)
  messages = []
  while (true) do
    messages = consume
    block.call(messages) if messages && !messages.empty?
    sleep(polling)
  end
end

#parse_message_set_from(data) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/kafka/consumer.rb', line 98

def parse_message_set_from(data)
  messages = []
  processed = 0
  length = data.length - 4
  while (processed <= length) do
    message_size = data[processed, 4].unpack("N").shift + 4
    message_data = data[processed, message_size]
    break unless message_data.size == message_size
    messages << Kafka::Message.parse_from(message_data)
    processed += message_size
  end
  self.offset += processed
  messages
end

#read_data_responseObject



77
78
79
80
81
82
# File 'lib/kafka/consumer.rb', line 77

def read_data_response
  data_length = read(4).unpack("N").shift
  data = read(data_length)
  # TODO: inspect error code instead of skipping it
  data[2, data.length]
end

#read_offsets_responseObject



68
69
70
# File 'lib/kafka/consumer.rb', line 68

def read_offsets_response
  read_data_response[4,8].reverse.unpack('q')[0]
end

#send_consume_requestObject



72
73
74
75
# File 'lib/kafka/consumer.rb', line 72

def send_consume_request
  write(encoded_request_size)
  write(encode_request(Kafka::RequestType::FETCH, topic, partition, offset, max_size))
end

#send_offsets_requestObject



63
64
65
66
# File 'lib/kafka/consumer.rb', line 63

def send_offsets_request
  write(encoded_request_size)
  write(encode_request(Kafka::RequestType::OFFSETS, topic, partition, LATEST_OFFSET, MAX_OFFSETS))
end