Class: Kafka::OffsetManager
- Inherits:
-
Object
- Object
- Kafka::OffsetManager
- Defined in:
- lib/kafka/offset_manager.rb
Overview
Manages a consumer's position in partitions, figures out where to resume processing from, etc.
Constant Summary collapse
- DEFAULT_RETENTION_TIME =
The default broker setting for offsets.retention.minutes is 1440.
1440 * 60
Instance Method Summary collapse
-
#clear_offsets ⇒ nil
Clear all stored offset information.
-
#clear_offsets_excluding(excluded) ⇒ nil
Clear stored offset information for all partitions except those specified in
excluded
. -
#commit_offsets(recommit = false) ⇒ nil
Commit offsets of messages that have been marked as processed.
-
#commit_offsets_if_necessary ⇒ nil
Commit offsets if necessary, according to the offset commit policy specified when initializing the class.
-
#initialize(cluster:, group:, fetcher:, logger:, commit_interval:, commit_threshold:, offset_retention_time:) ⇒ OffsetManager
constructor
A new instance of OffsetManager.
-
#mark_as_processed(topic, partition, offset) ⇒ nil
Mark a message as having been processed.
-
#next_offset_for(topic, partition) ⇒ Integer
Return the next offset that should be fetched for the specified partition.
-
#seek_to(topic, partition, offset) ⇒ nil
Move the consumer's position in the partition to the specified offset.
-
#seek_to_default(topic, partition) ⇒ nil
Move the consumer's position in the partition back to the configured default offset, either the first or latest in the partition.
-
#set_default_offset(topic, default_offset) ⇒ nil
Set the default offset for a topic.
Constructor Details
#initialize(cluster:, group:, fetcher:, logger:, commit_interval:, commit_threshold:, offset_retention_time:) ⇒ OffsetManager
Returns a new instance of OffsetManager.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/kafka/offset_manager.rb', line 12 def initialize(cluster:, group:, fetcher:, logger:, commit_interval:, commit_threshold:, offset_retention_time:) @cluster = cluster @group = group @fetcher = fetcher @logger = TaggedLogger.new(logger) @commit_interval = commit_interval @commit_threshold = commit_threshold @uncommitted_offsets = 0 @processed_offsets = {} @default_offsets = {} @committed_offsets = nil @resolved_offsets = {} @last_commit = Time.now @last_recommit = nil @recommit_interval = (offset_retention_time || DEFAULT_RETENTION_TIME) / 2 end |
Instance Method Details
#clear_offsets ⇒ nil
Clear all stored offset information.
160 161 162 163 164 165 166 |
# File 'lib/kafka/offset_manager.rb', line 160 def clear_offsets @processed_offsets.clear @resolved_offsets.clear # Clear the cached commits from the brokers. @committed_offsets = nil end |
#clear_offsets_excluding(excluded) ⇒ nil
Clear stored offset information for all partitions except those specified
in excluded
.
offset_manager.clear_offsets_excluding("my-topic" => [1, 2, 3])
174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/kafka/offset_manager.rb', line 174 def clear_offsets_excluding(excluded) # Clear all offsets that aren't in `excluded`. @processed_offsets.each do |topic, partitions| partitions.keep_if do |partition, _| excluded.fetch(topic, []).include?(partition) end end # Clear the cached commits from the brokers. @committed_offsets = nil @resolved_offsets.clear end |
#commit_offsets(recommit = false) ⇒ nil
Commit offsets of messages that have been marked as processed.
If recommit
is set to true, we will also commit the existing positions
even if no messages have been processed on a partition. This is done
in order to avoid the offset information expiring in cases where messages
are very rare -- it's essentially a keep-alive.
131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/kafka/offset_manager.rb', line 131 def commit_offsets(recommit = false) offsets = offsets_to_commit(recommit) unless offsets.empty? @logger.debug "Committing offsets#{recommit ? ' with recommit' : ''}: #{prettify_offsets(offsets)}" @group.commit_offsets(offsets) @last_commit = Time.now @last_recommit = Time.now if recommit @uncommitted_offsets = 0 @committed_offsets = nil end end |
#commit_offsets_if_necessary ⇒ nil
Commit offsets if necessary, according to the offset commit policy specified when initializing the class.
150 151 152 153 154 155 |
# File 'lib/kafka/offset_manager.rb', line 150 def commit_offsets_if_necessary recommit = recommit_timeout_reached? if recommit || commit_timeout_reached? || commit_threshold_reached? commit_offsets(recommit) end end |
#mark_as_processed(topic, partition, offset) ⇒ nil
Mark a message as having been processed.
When offsets are committed, the message's offset will be stored in Kafka so that we can resume from this point at a later time.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/kafka/offset_manager.rb', line 52 def mark_as_processed(topic, partition, offset) unless @group.assigned_to?(topic, partition) @logger.debug "Not marking #{topic}/#{partition}:#{offset} as processed for partition not assigned to this consumer." return end @processed_offsets[topic] ||= {} last_processed_offset = @processed_offsets[topic][partition] || -1 if last_processed_offset > offset + 1 @logger.debug "Not overwriting newer offset #{topic}/#{partition}:#{last_processed_offset - 1} with older #{offset}" return end @uncommitted_offsets += 1 # The committed offset should always be the offset of the next message that the # application will read, thus adding one to the last message processed. @processed_offsets[topic][partition] = offset + 1 @logger.debug "Marking #{topic}/#{partition}:#{offset} as processed" end |
#next_offset_for(topic, partition) ⇒ Integer
Return the next offset that should be fetched for the specified partition.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/kafka/offset_manager.rb', line 106 def next_offset_for(topic, partition) offset = @processed_offsets.fetch(topic, {}).fetch(partition) { committed_offset_for(topic, partition) } # A negative offset means that no offset has been committed, so we need to # resolve the default offset for the topic. if offset < 0 resolve_offset(topic, partition) else # The next offset is the last offset. offset end end |
#seek_to(topic, partition, offset) ⇒ nil
Move the consumer's position in the partition to the specified offset.
94 95 96 97 98 99 |
# File 'lib/kafka/offset_manager.rb', line 94 def seek_to(topic, partition, offset) @processed_offsets[topic] ||= {} @processed_offsets[topic][partition] = offset @fetcher.seek(topic, partition, offset) end |
#seek_to_default(topic, partition) ⇒ nil
Move the consumer's position in the partition back to the configured default offset, either the first or latest in the partition.
79 80 81 82 83 84 85 86 |
# File 'lib/kafka/offset_manager.rb', line 79 def seek_to_default(topic, partition) # Remove any cached offset, in case things have changed broker-side. clear_resolved_offset(topic) offset = resolve_offset(topic, partition) seek_to(topic, partition, offset) end |
#set_default_offset(topic, default_offset) ⇒ nil
Set the default offset for a topic.
When the consumer is started for the first time, or in cases where it gets stuck and has to reset its position, it must start either with the earliest messages or with the latest, skipping to the very end of each partition.
39 40 41 |
# File 'lib/kafka/offset_manager.rb', line 39 def set_default_offset(topic, default_offset) @default_offsets[topic] = default_offset end |