Class: Racecar::ConsumerSet
- Inherits:
-
Object
- Object
- Racecar::ConsumerSet
- Defined in:
- lib/racecar/consumer_set.rb
Constant Summary collapse
- MAX_POLL_TRIES =
10
Instance Method Summary collapse
-
#batch_poll(max_wait_time_ms = @config.max_wait_time_ms, max_messages = @config.fetch_messages) ⇒ Object
batch_poll collects messages until any of the following occurs: - max_wait_time_ms time has passed - max_messages have been collected - a nil message was polled (end of topic, Kafka stalled, etc.).
- #close ⇒ Object
- #commit ⇒ Object
- #current ⇒ Object
- #each_subscribed ⇒ Object (also: #each)
-
#initialize(config, logger, instrumenter = NullInstrumenter) ⇒ ConsumerSet
constructor
A new instance of ConsumerSet.
- #pause(topic, partition, offset) ⇒ Object
- #poll(max_wait_time_ms = @config.max_wait_time_ms) ⇒ Object
- #resume(topic, partition) ⇒ Object
- #store_offset(message) ⇒ Object
-
#subscribe_all ⇒ Object
Subscribe to all topics eagerly, even if there’s still messages elsewhere.
Constructor Details
#initialize(config, logger, instrumenter = NullInstrumenter) ⇒ ConsumerSet
Returns a new instance of ConsumerSet.
7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/racecar/consumer_set.rb', line 7 def initialize(config, logger, instrumenter = NullInstrumenter) @config, @logger = config, logger @instrumenter = instrumenter raise ArgumentError, "Subscriptions must not be empty when subscribing" if @config.subscriptions.empty? @consumers = [] @consumer_id_iterator = (0...@config.subscriptions.size).cycle @previous_retries = 0 @last_poll_read_nil_message = false @paused_tpls = Hash.new { |h, k| h[k] = {} } end |
Instance Method Details
#batch_poll(max_wait_time_ms = @config.max_wait_time_ms, max_messages = @config.fetch_messages) ⇒ Object
batch_poll collects messages until any of the following occurs:
-
max_wait_time_ms time has passed
-
max_messages have been collected
-
a nil message was polled (end of topic, Kafka stalled, etc.)
The messages are from a single topic, but potentially from more than one partition.
Any errors during polling are retried in an exponential backoff fashion. If an error occurs, but there is no time left for a backoff and retry, it will return the already collected messages and only retry on the next call.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/racecar/consumer_set.rb', line 35 def batch_poll(max_wait_time_ms = @config.max_wait_time_ms, = @config.) started_at = Time.now remain_ms = max_wait_time_ms maybe_select_next_consumer = [] while remain_ms > 0 && .size < remain_ms = remaining_time_ms(max_wait_time_ms, started_at) msg = poll_with_retries(remain_ms) break if msg.nil? << msg end end |
#close ⇒ Object
67 68 69 70 |
# File 'lib/racecar/consumer_set.rb', line 67 def close each_subscribed(&:close) @paused_tpls.clear end |
#commit ⇒ Object
61 62 63 64 65 |
# File 'lib/racecar/consumer_set.rb', line 61 def commit each_subscribed do |consumer| commit_rescue_no_offset(consumer) end end |
#current ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/racecar/consumer_set.rb', line 72 def current @consumers[@consumer_id_iterator.peek] ||= begin consumer_config = Rdkafka::Config.new(rdkafka_config(current_subscription)) listener = RebalanceListener.new(@config.consumer_class, @instrumenter) consumer_config.consumer_rebalance_listener = listener consumer = consumer_config.consumer listener.rdkafka_consumer = consumer @instrumenter.instrument('join_group') do consumer.subscribe current_subscription.topic end consumer end end |
#each_subscribed ⇒ Object Also known as: each
87 88 89 90 91 92 93 |
# File 'lib/racecar/consumer_set.rb', line 87 def each_subscribed if block_given? @consumers.each { |c| yield c } else @consumers.each end end |
#pause(topic, partition, offset) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/racecar/consumer_set.rb', line 95 def pause(topic, partition, offset) consumer, filtered_tpl = find_consumer_by(topic, partition) if !consumer @logger.info "Attempted to pause #{topic}/#{partition}, but we're not subscribed to it" return end consumer.pause(filtered_tpl) fake_msg = OpenStruct.new(topic: topic, partition: partition, offset: offset) consumer.seek(fake_msg) @paused_tpls[topic][partition] = [consumer, filtered_tpl] end |
#poll(max_wait_time_ms = @config.max_wait_time_ms) ⇒ Object
21 22 23 |
# File 'lib/racecar/consumer_set.rb', line 21 def poll(max_wait_time_ms = @config.max_wait_time_ms) batch_poll(max_wait_time_ms, 1).first end |
#resume(topic, partition) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/racecar/consumer_set.rb', line 109 def resume(topic, partition) consumer, filtered_tpl = find_consumer_by(topic, partition) if !consumer && @paused_tpls[topic][partition] consumer, filtered_tpl = @paused_tpls[topic][partition] end if !consumer @logger.info "Attempted to resume #{topic}/#{partition}, but we're not subscribed to it" return end consumer.resume(filtered_tpl) @paused_tpls[topic].delete(partition) @paused_tpls.delete(topic) if @paused_tpls[topic].empty? end |
#store_offset(message) ⇒ Object
51 52 53 54 55 56 57 58 59 |
# File 'lib/racecar/consumer_set.rb', line 51 def store_offset() current.store_offset() rescue Rdkafka::RdkafkaError => e if e.code == :state # -172 @logger.warn "Attempted to store_offset, but we're not subscribed to it: #{ErroneousStateError.new(e)}" return end raise e end |
#subscribe_all ⇒ Object
Subscribe to all topics eagerly, even if there’s still messages elsewhere. Usually that’s not needed and Kafka might rebalance if topics are not polled frequently enough.
131 132 133 134 135 136 |
# File 'lib/racecar/consumer_set.rb', line 131 def subscribe_all @config.subscriptions.size.times do current select_next_consumer end end |