Class: KafkaSyrup::PartitionConsumer

Inherits:
Object
  • Object
show all
Includes:
Utils
Defined in:
lib/kafka_syrup/partition_consumer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utils

#load_args, #log

Constructor Details

#initialize(*args) ⇒ PartitionConsumer

Returns a new instance of PartitionConsumer.



7
8
9
10
11
12
13
14
15
# File 'lib/kafka_syrup/partition_consumer.rb', line 7

def initialize(*args)
  load_args(*args)

  

  self.messages = Queue.new
  self.control_queue = Queue.new
  self.lock = Mutex.new
end

Instance Attribute Details

#brokerObject

Returns the value of attribute broker.



5
6
7
# File 'lib/kafka_syrup/partition_consumer.rb', line 5

def broker
  @broker
end

#control_queueObject

Returns the value of attribute control_queue.



5
6
7
# File 'lib/kafka_syrup/partition_consumer.rb', line 5

def control_queue
  @control_queue
end

#lockObject

Returns the value of attribute lock.



5
6
7
# File 'lib/kafka_syrup/partition_consumer.rb', line 5

def lock
  @lock
end

#max_bytesObject

Returns the value of attribute max_bytes.



5
6
7
# File 'lib/kafka_syrup/partition_consumer.rb', line 5

def max_bytes
  @max_bytes
end

#messagesObject

Returns the value of attribute messages.



5
6
7
# File 'lib/kafka_syrup/partition_consumer.rb', line 5

def messages
  @messages
end

#offsetObject

Returns the value of attribute offset.



5
6
7
# File 'lib/kafka_syrup/partition_consumer.rb', line 5

def offset
  @offset
end

#partitionObject

Returns the value of attribute partition.



5
6
7
# File 'lib/kafka_syrup/partition_consumer.rb', line 5

def partition
  @partition
end

#threadObject

Returns the value of attribute thread.



5
6
7
# File 'lib/kafka_syrup/partition_consumer.rb', line 5

def thread
  @thread
end

#topicObject

Returns the value of attribute topic.



5
6
7
# File 'lib/kafka_syrup/partition_consumer.rb', line 5

def topic
  @topic
end

Instance Method Details

#fetch(limit = nil) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/kafka_syrup/partition_consumer.rb', line 52

def fetch(limit = nil)
  start_fetcher_thread unless thread

  control_queue.push(:fetch) if messages.empty? && control_queue.num_waiting > 0

  result = []

  loop do
    result << messages.pop
    break if messages.empty? || (limit && result.count == limit)
  end

  self.offset = result.last[:offset] + 1

  result
end

#fetch_from_broker(&block) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/kafka_syrup/partition_consumer.rb', line 35

def fetch_from_broker(&block)
  lock.synchronize{ self.offset = get_available_offset(offset) } unless offset.is_a?(Fixnum)

  opts = { max_bytes: max_bytes } if max_bytes
  request = KafkaSyrup::Protocol::FetchRequest.new(opts)
  request.add_topic(topic).add_partition(partition, offset)

  response = partition_from_response(broker.send_request(request, &block))

  lock.synchronize{ self.offset = response.messages.last.offset + 1 } unless response.messages.empty?
rescue KafkaSyrup::KafkaResponseErrors::OffsetOutOfRange
  low = get_available_offset(:earliest)
  high = get_available_offset

  lock.synchronize{ self.offset = offset < low ? low : high }
end

#get_available_offset(time = :latest) ⇒ Object



27
28
29
30
31
32
33
# File 'lib/kafka_syrup/partition_consumer.rb', line 27

def get_available_offset(time = :latest)
  request = KafkaSyrup::Protocol::OffsetRequest.new
  request.add_topic(topic).add_partition(partition, time == :earliest ? -2 : -1)

  response = broker.send_request(request)
  partition_from_response(response).offsets.last
end

#refresh_metadataObject

Raises:



17
18
19
20
21
22
23
24
25
# File 'lib/kafka_syrup/partition_consumer.rb', line 17

def 
  broker && broker.socket && broker.socket.close

  meta = KafkaSyrup.(topic)

  self.broker = meta.brokers.detect{ |b| b.node == partition_from_response(meta).leader }
  raise BrokerNotFound unless self.broker
  self.broker.extend KafkaSyrup::Broker::Communications
end

#retry_backoffObject



69
70
71
# File 'lib/kafka_syrup/partition_consumer.rb', line 69

def retry_backoff
  @retry_backoff ||= KafkaSyrup.config.retry_backoff / 1000.0
end