Class: KafkaSyrup::PartitionConsumer
- Inherits:
-
Object
- Object
- KafkaSyrup::PartitionConsumer
- Includes:
- Utils
- Defined in:
- lib/kafka_syrup/partition_consumer.rb
Instance Attribute Summary collapse
-
#broker ⇒ Object
Returns the value of attribute broker.
-
#control_queue ⇒ Object
Returns the value of attribute control_queue.
-
#lock ⇒ Object
Returns the value of attribute lock.
-
#max_bytes ⇒ Object
Returns the value of attribute max_bytes.
-
#messages ⇒ Object
Returns the value of attribute messages.
-
#offset ⇒ Object
Returns the value of attribute offset.
-
#partition ⇒ Object
Returns the value of attribute partition.
-
#thread ⇒ Object
Returns the value of attribute thread.
-
#topic ⇒ Object
Returns the value of attribute topic.
Instance Method Summary collapse
- #fetch(limit = nil) ⇒ Object
- #fetch_from_broker(&block) ⇒ Object
- #get_available_offset(time = :latest) ⇒ Object
-
#initialize(*args) ⇒ PartitionConsumer
constructor
A new instance of PartitionConsumer.
- #refresh_metadata ⇒ Object
- #retry_backoff ⇒ Object
Methods included from Utils
Constructor Details
#initialize(*args) ⇒ PartitionConsumer
Returns a new instance of PartitionConsumer.
7 8 9 10 11 12 13 14 15 |
# File 'lib/kafka_syrup/partition_consumer.rb', line 7 def initialize(*args) load_args(*args) self. = Queue.new self.control_queue = Queue.new self.lock = Mutex.new end |
Instance Attribute Details
#broker ⇒ Object
Returns the value of attribute broker.
5 6 7 |
# File 'lib/kafka_syrup/partition_consumer.rb', line 5 def broker @broker end |
#control_queue ⇒ Object
Returns the value of attribute control_queue.
5 6 7 |
# File 'lib/kafka_syrup/partition_consumer.rb', line 5 def control_queue @control_queue end |
#lock ⇒ Object
Returns the value of attribute lock.
5 6 7 |
# File 'lib/kafka_syrup/partition_consumer.rb', line 5 def lock @lock end |
#max_bytes ⇒ Object
Returns the value of attribute max_bytes.
5 6 7 |
# File 'lib/kafka_syrup/partition_consumer.rb', line 5 def max_bytes @max_bytes end |
#messages ⇒ Object
Returns the value of attribute messages.
5 6 7 |
# File 'lib/kafka_syrup/partition_consumer.rb', line 5 def @messages end |
#offset ⇒ Object
Returns the value of attribute offset.
5 6 7 |
# File 'lib/kafka_syrup/partition_consumer.rb', line 5 def offset @offset end |
#partition ⇒ Object
Returns the value of attribute partition.
5 6 7 |
# File 'lib/kafka_syrup/partition_consumer.rb', line 5 def partition @partition end |
#thread ⇒ Object
Returns the value of attribute thread.
5 6 7 |
# File 'lib/kafka_syrup/partition_consumer.rb', line 5 def thread @thread end |
#topic ⇒ Object
Returns the value of attribute topic.
5 6 7 |
# File 'lib/kafka_syrup/partition_consumer.rb', line 5 def topic @topic end |
Instance Method Details
#fetch(limit = nil) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/kafka_syrup/partition_consumer.rb', line 52 def fetch(limit = nil) start_fetcher_thread unless thread control_queue.push(:fetch) if .empty? && control_queue.num_waiting > 0 result = [] loop do result << .pop break if .empty? || (limit && result.count == limit) end self.offset = result.last[:offset] + 1 result end |
#fetch_from_broker(&block) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/kafka_syrup/partition_consumer.rb', line 35 def fetch_from_broker(&block) lock.synchronize{ self.offset = get_available_offset(offset) } unless offset.is_a?(Fixnum) opts = { max_bytes: max_bytes } if max_bytes request = KafkaSyrup::Protocol::FetchRequest.new(opts) request.add_topic(topic).add_partition(partition, offset) response = partition_from_response(broker.send_request(request, &block)) lock.synchronize{ self.offset = response..last.offset + 1 } unless response..empty? rescue KafkaSyrup::KafkaResponseErrors::OffsetOutOfRange low = get_available_offset(:earliest) high = get_available_offset lock.synchronize{ self.offset = offset < low ? low : high } end |
#get_available_offset(time = :latest) ⇒ Object
27 28 29 30 31 32 33 |
# File 'lib/kafka_syrup/partition_consumer.rb', line 27 def get_available_offset(time = :latest) request = KafkaSyrup::Protocol::OffsetRequest.new request.add_topic(topic).add_partition(partition, time == :earliest ? -2 : -1) response = broker.send_request(request) partition_from_response(response).offsets.last end |
#refresh_metadata ⇒ Object
17 18 19 20 21 22 23 24 25 |
# File 'lib/kafka_syrup/partition_consumer.rb', line 17 def broker && broker.socket && broker.socket.close = KafkaSyrup.(topic) self.broker = .brokers.detect{ |b| b.node == partition_from_response().leader } raise BrokerNotFound unless self.broker self.broker.extend KafkaSyrup::Broker::Communications end |
#retry_backoff ⇒ Object
69 70 71 |
# File 'lib/kafka_syrup/partition_consumer.rb', line 69 def retry_backoff @retry_backoff ||= KafkaSyrup.config.retry_backoff / 1000.0 end |