Class: Racecar::ConsumerSet

Inherits:
Object
  • Object
show all
Defined in:
lib/racecar/consumer_set.rb

Constant Summary collapse

MAX_POLL_TRIES =
10

Instance Method Summary collapse

Constructor Details

#initialize(config, logger, instrumenter = NullInstrumenter) ⇒ ConsumerSet

Returns a new instance of ConsumerSet.

Raises:

  • (ArgumentError)


7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/racecar/consumer_set.rb', line 7

def initialize(config, logger, instrumenter = NullInstrumenter)
  @config, @logger = config, logger
  @instrumenter = instrumenter
  raise ArgumentError, "Subscriptions must not be empty when subscribing" if @config.subscriptions.empty?

  @consumers = []
  @consumer_id_iterator = (0...@config.subscriptions.size).cycle

  @previous_retries = 0

  @last_poll_read_nil_message = false
  @paused_tpls = Hash.new { |h, k| h[k] = {} }
end

Instance Method Details

#batch_poll(max_wait_time_ms = @config.max_wait_time_ms, max_messages = @config.fetch_messages) ⇒ Object

batch_poll collects messages until any of the following occurs:

  • max_wait_time_ms time has passed

  • max_messages have been collected

  • a nil message was polled (end of topic, Kafka stalled, etc.)

The messages are from a single topic, but potentially from more than one partition.

Any errors during polling are retried in an exponential backoff fashion. If an error occurs, but there is no time left for a backoff and retry, it will return the already collected messages and only retry on the next call.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/racecar/consumer_set.rb', line 35

def batch_poll(max_wait_time_ms = @config.max_wait_time_ms, max_messages = @config.fetch_messages)
  started_at = Time.now
  remain_ms = max_wait_time_ms
  maybe_select_next_consumer
  messages = []

  while remain_ms > 0 && messages.size < max_messages
    remain_ms = remaining_time_ms(max_wait_time_ms, started_at)
    msg = poll_with_retries(remain_ms)
    break if msg.nil?
    messages << msg
  end

  messages
end

#closeObject



67
68
69
70
# File 'lib/racecar/consumer_set.rb', line 67

def close
  each_subscribed(&:close)
  @paused_tpls.clear
end

#commitObject



61
62
63
64
65
# File 'lib/racecar/consumer_set.rb', line 61

def commit
  each_subscribed do |consumer|
    commit_rescue_no_offset(consumer)
  end
end

#currentObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/racecar/consumer_set.rb', line 72

def current
  @consumers[@consumer_id_iterator.peek] ||= begin
    consumer_config = Rdkafka::Config.new(rdkafka_config(current_subscription))
    listener = RebalanceListener.new(@config.consumer_class, @instrumenter)
    consumer_config.consumer_rebalance_listener = listener
    consumer = consumer_config.consumer
    listener.rdkafka_consumer = consumer

    @instrumenter.instrument('join_group') do
      consumer.subscribe current_subscription.topic
    end
    consumer
  end
end

#each_subscribedObject Also known as: each



87
88
89
90
91
92
93
# File 'lib/racecar/consumer_set.rb', line 87

def each_subscribed
  if block_given?
    @consumers.each { |c| yield c }
  else
    @consumers.each
  end
end

#pause(topic, partition, offset) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/racecar/consumer_set.rb', line 95

def pause(topic, partition, offset)
  consumer, filtered_tpl = find_consumer_by(topic, partition)
  if !consumer
    @logger.info "Attempted to pause #{topic}/#{partition}, but we're not subscribed to it"
    return
  end

  consumer.pause(filtered_tpl)
  fake_msg = OpenStruct.new(topic: topic, partition: partition, offset: offset)
  consumer.seek(fake_msg)

  @paused_tpls[topic][partition] = [consumer, filtered_tpl]
end

#poll(max_wait_time_ms = @config.max_wait_time_ms) ⇒ Object



21
22
23
# File 'lib/racecar/consumer_set.rb', line 21

def poll(max_wait_time_ms = @config.max_wait_time_ms)
  batch_poll(max_wait_time_ms, 1).first
end

#resume(topic, partition) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/racecar/consumer_set.rb', line 109

def resume(topic, partition)
  consumer, filtered_tpl = find_consumer_by(topic, partition)

  if !consumer && @paused_tpls[topic][partition]
    consumer, filtered_tpl = @paused_tpls[topic][partition]
  end

  if !consumer
    @logger.info "Attempted to resume #{topic}/#{partition}, but we're not subscribed to it"
    return
  end

  consumer.resume(filtered_tpl)
  @paused_tpls[topic].delete(partition)
  @paused_tpls.delete(topic) if @paused_tpls[topic].empty?
end

#store_offset(message) ⇒ Object



51
52
53
54
55
56
57
58
59
# File 'lib/racecar/consumer_set.rb', line 51

def store_offset(message)
  current.store_offset(message)
rescue Rdkafka::RdkafkaError => e
  if e.code == :state # -172
    @logger.warn "Attempted to store_offset, but we're not subscribed to it: #{ErroneousStateError.new(e)}"
    return
  end
  raise e
end

#subscribe_allObject

Subscribe to all topics eagerly, even if there’s still messages elsewhere. Usually that’s not needed and Kafka might rebalance if topics are not polled frequently enough.



131
132
133
134
135
136
# File 'lib/racecar/consumer_set.rb', line 131

def subscribe_all
  @config.subscriptions.size.times do
    current
    select_next_consumer
  end
end